flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [FLINK-1395] Add tests for custom serializers with JodaTime.
Date Mon, 19 Jan 2015 05:46:08 GMT
Repository: flink
Updated Branches:
  refs/heads/master 29c54a20d -> 6cc35837f


[FLINK-1395] Add tests for custom serializers with JodaTime.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/020b282b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/020b282b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/020b282b

Branch: refs/heads/master
Commit: 020b282bdc5468aa51b231e9ae8d4d3a1a76e696
Parents: 29c54a2
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Jan 13 15:23:46 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Jan 18 18:25:28 2015 -0800

----------------------------------------------------------------------
 .../common/typeutils/SerializerTestBase.java    |  21 ++-
 flink-dist/src/main/flink-bin/LICENSE           |   3 +
 flink-java/pom.xml                              |   7 +
 .../AbstractGenericTypeSerializerTest.java      |   4 +-
 .../runtime/KryoGenericTypeSerializerTest.java  |   2 +
 .../runtime/KryoWithCustomSerializersTest.java  |  70 ++++++++
 .../runtime/KryoGenericTypeSerializerTest.scala | 135 ---------------
 flink-tests/pom.xml                             |   7 +
 .../runtime/KryoGenericTypeSerializerTest.scala | 163 +++++++++++++++++++
 .../api/scala/runtime/TupleSerializerTest.scala |  23 ++-
 10 files changed, 294 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/020b282b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 4835b4f..1531ae6 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -35,13 +35,17 @@ import org.junit.Assert;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.junit.Test;
 
 /**
  * Abstract test base for serializers.
+ *
+ * We have a toString() call on all deserialized
+ * values because this is further evidence that the deserialized value is actually correct.
+ * (JodaTime DataTime) with the default KryoSerializer used to pass this test but the
+ * internal state would be corrupt, which becomes evident when toString is called.
  */
 public abstract class SerializerTestBase<T> {
 	
@@ -99,6 +103,7 @@ public abstract class SerializerTestBase<T> {
 			
 			for (T datum : testData) {
 				T copy = serializer.copy(datum);
+				copy.toString();
 				deepEquals("Copied element is not equal to the original element.", datum, copy);
 			}
 		}
@@ -117,6 +122,7 @@ public abstract class SerializerTestBase<T> {
 			
 			for (T datum : testData) {
 				T copy = serializer.copy(datum, serializer.createInstance());
+				copy.toString();
 				deepEquals("Copied element is not equal to the original element.", datum, copy);
 			}
 		}
@@ -137,6 +143,7 @@ public abstract class SerializerTestBase<T> {
 			
 			for (T datum : testData) {
 				T copy = serializer.copy(datum, target);
+				copy.toString();
 				deepEquals("Copied element is not equal to the original element.", datum, copy);
 				target = copy;
 			}
@@ -162,6 +169,8 @@ public abstract class SerializerTestBase<T> {
 				assertTrue("No data available during deserialization.", in.available() > 0);
 				
 				T deserialized = serializer.deserialize(serializer.createInstance(), in);
+ 				deserialized.toString();
+
 				deepEquals("Deserialized value if wrong.", value, deserialized);
 				
 				assertTrue("Trailing data available after deserialization.", in.available() == 0);
@@ -190,6 +199,8 @@ public abstract class SerializerTestBase<T> {
 				assertTrue("No data available during deserialization.", in.available() > 0);
 				
 				T deserialized = serializer.deserialize(reuseValue, in);
+				deserialized.toString();
+
 				deepEquals("Deserialized value if wrong.", value, deserialized);
 				
 				assertTrue("Trailing data available after deserialization.", in.available() == 0);
@@ -220,6 +231,8 @@ public abstract class SerializerTestBase<T> {
 			int num = 0;
 			while (in.available() > 0) {
 				T deserialized = serializer.deserialize(in);
+				deserialized.toString();
+
 				deepEquals("Deserialized value if wrong.", testData[num], deserialized);
 				num++;
 			}
@@ -250,6 +263,8 @@ public abstract class SerializerTestBase<T> {
 			int num = 0;
 			while (in.available() > 0) {
 				T deserialized = serializer.deserialize(reuseValue, in);
+				deserialized.toString();
+
 				deepEquals("Deserialized value if wrong.", testData[num], deserialized);
 				reuseValue = deserialized;
 				num++;
@@ -283,6 +298,8 @@ public abstract class SerializerTestBase<T> {
 				assertTrue("No data available copying.", toVerify.available() > 0);
 				
 				T deserialized = serializer.deserialize(serializer.createInstance(), toVerify);
+				deserialized.toString();
+
 				deepEquals("Deserialized value if wrong.", value, deserialized);
 				
 				assertTrue("Trailing data available after deserialization.", toVerify.available() ==
0);
@@ -318,6 +335,8 @@ public abstract class SerializerTestBase<T> {
 			
 			while (toVerify.available() > 0) {
 				T deserialized = serializer.deserialize(serializer.createInstance(), toVerify);
+				deserialized.toString();
+
 				deepEquals("Deserialized value if wrong.", testData[num], deserialized);
 				num++;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/020b282b/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index a18ebb9..2838497 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -238,6 +238,7 @@ under the Apache License (v 2.0):
  - Chill_2.10 v0.5.1 (https://github.com/twitter/chill)
  - Jetty Web Container (http://www.eclipse.org/jetty/)
  - Amazon Web Services SDK for Java (http://aws.amazon.com/sdkforjava/)
+ - ScalaTest (http://www.scalatest.org)
  - StartBootstrap (http://startbootstrap.com)
  - CHAP Links Library Timeline (http://almende.github.io/chap-links-library/)
  - Twitter Hosebird Client (hbc) (https://github.com/twitter/hbc)
@@ -298,6 +299,7 @@ BSD-style licenses:
  - Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet
  - D3 (http://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock
  - LevelDB JNI (https://github.com/fusesource/leveldbjni/) - Copyright (c) 2011, FuseSource
Corp.
+ - Memcached (https://github.com/memcached/memcached) - Copyright (c) 2003, Danga Interactive,
Inc.
  - Redis (http://redis.io/) - Copyright (c) 2009, Salvatore Sanfilippo and Pieter Noordhuis
  
 [BSD-like License]
@@ -461,6 +463,7 @@ For src/main/native/src/org/apache/hadoop/io/compress/lz4/{lz4.h,lz4.c,lz4hc.h,l
    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
    You can contact the author at :
+   - LZ4 homepage : http://fastcompression.blogspot.com/p/lz4.html
    - LZ4 source repository : http://code.google.com/p/lz4/
    - LZ4 public forum : https://groups.google.com/forum/#!forum/lz4c
 */

http://git-wip-us.apache.org/repos/asf/flink/blob/020b282b/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 8abe3d4..fa5a1d6 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -78,6 +78,13 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+		
+		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+			<scope>test</scope>
+		</dependency>
+		
 	</dependencies>
 
 	<!-- Because flink-scala uses it in tests -->

http://git-wip-us.apache.org/repos/asf/flink/blob/020b282b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
index d604105..2a11bd7 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
@@ -21,10 +21,13 @@ package org.apache.flink.api.java.typeutils.runtime;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.StringUtils;
+import org.joda.time.DateTime;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -36,7 +39,6 @@ abstract public class AbstractGenericTypeSerializerTest {
 
 	private final Random rnd = new Random(349712539451944123L);
 
-
 	@Test
 	public void testString() {
 		runTests("abc", "",

http://git-wip-us.apache.org/repos/asf/flink/blob/020b282b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
index d0fc6ed..5953599 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
@@ -50,6 +50,8 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer
 		runTests(b);
 	}
 
+
+
 	@Test
 	public void testJavaDequeue(){
 		Collection<Integer> c = new LinkedList<Integer>();

http://git-wip-us.apache.org/repos/asf/flink/blob/020b282b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java
new file mode 100644
index 0000000..9d7ab61
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.util.Collection;
+import java.util.HashSet;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.joda.time.LocalDate;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+@SuppressWarnings("unchecked")
+public class KryoWithCustomSerializersTest extends AbstractGenericTypeSerializerTest {
+	
+
+	@Test
+	public void testJodaTime(){
+		Collection<LocalDate> b = new HashSet<LocalDate>();
+
+		b.add(new LocalDate(1L));
+		b.add(new LocalDate(2L));
+		
+		KryoSerializer.registerSerializer(LocalDate.class, LocalDateSerializer.class);
+		
+		runTests(b);
+	}
+
+	@Override
+	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+		return new KryoSerializer<T>(type);
+	}
+	
+	public static final class LocalDateSerializer extends Serializer<LocalDate> implements
java.io.Serializable {
+		
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void write(Kryo kryo, Output output, LocalDate object) {
+			output.writeInt(object.getYear());
+			output.writeInt(object.getMonthOfYear());
+			output.writeInt(object.getDayOfMonth());
+		}
+		
+		@Override
+		public LocalDate read(Kryo kryo, Input input, Class<LocalDate> type) {
+			return new LocalDate(input.readInt(), input.readInt(), input.readInt());
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/020b282b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
deleted file mode 100644
index 6f27c16..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.runtime
-
-import org.apache.flink.api.common.typeutils.SerializerTestInstance
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
-import org.junit.Test
-
-import scala.reflect._
-
-class KryoGenericTypeSerializerTest {
-
-  @Test
-  def testThrowableSerialization: Unit = {
-    val a = List(new RuntimeException("Hello"), new RuntimeException("there"))
-
-    runTests(a)
-  }
-
-  @Test
-  def testScalaListSerialization: Unit = {
-    val a = List(42,1,49,1337)
-
-    runTests(a)
-  }
-
-  @Test
-  def testScalaMutablelistSerialization: Unit = {
-    val a = scala.collection.mutable.ListBuffer(42,1,49,1337)
-
-    runTests(a)
-  }
-
-  @Test
-  def testScalaMapSerialization: Unit = {
-    val a = Map(("1" -> 1), ("2" -> 2), ("42" -> 42), ("1337" -> 1337))
-
-    runTests(Seq(a))
-  }
-
-  @Test
-  def testMutableMapSerialization: Unit ={
-    val a = scala.collection.mutable.Map((1 -> "1"), (2 -> "2"), (3 -> "3"))
-
-    runTests(Seq(a))
-  }
-
-  @Test
-  def testScalaListComplexTypeSerialization: Unit = {
-    val a = ComplexType("1234", 42, List(1,2,3,4))
-    val b = ComplexType("4321", 24, List(4,3,2,1))
-    val c = ComplexType("1337", 1, List(1))
-    val list = List(a, b, c)
-
-    runTests(list)
-  }
-
-  @Test
-  def testHeterogenousScalaList: Unit = {
-    val a = new DerivedType("foo", "bar")
-    val b = new BaseType("foobar")
-    val c = new DerivedType2("bar", "foo")
-    val list = List(a,b,c)
-
-    runTests(list)
-  }
-
-  case class ComplexType(id: String, number: Int, values: List[Int]){
-    override def equals(obj: Any): Boolean ={
-      if(obj != null && obj.isInstanceOf[ComplexType]){
-        val complexType = obj.asInstanceOf[ComplexType]
-        id.equals(complexType.id) && number.equals(complexType.number) &&
values.equals(
-          complexType.values)
-      }else{
-        false
-      }
-    }
-  }
-
-  class BaseType(val name: String){
-    override def equals(obj: Any): Boolean = {
-      if(obj != null && obj.isInstanceOf[BaseType]){
-        obj.asInstanceOf[BaseType].name.equals(name)
-      }else{
-        false
-      }
-    }
-  }
-
-  class DerivedType(name: String, val sub: String) extends BaseType(name){
-    override def equals(obj: Any): Boolean = {
-      if(obj != null && obj.isInstanceOf[DerivedType]){
-        super.equals(obj) && obj.asInstanceOf[DerivedType].sub.equals(sub)
-      }else{
-        false
-      }
-    }
-  }
-
-  class DerivedType2(name: String, val sub: String) extends BaseType(name){
-    override def equals(obj: Any): Boolean = {
-      if(obj != null && obj.isInstanceOf[DerivedType2]){
-        super.equals(obj) && obj.asInstanceOf[DerivedType2].sub.equals(sub)
-      }else{
-        false
-      }
-    }
-  }
-
-  def runTests[T : ClassTag](objects: Seq[T]): Unit ={
-    val clsTag = classTag[T]
-    val typeInfo = new GenericTypeInfo[T](clsTag.runtimeClass.asInstanceOf[Class[T]])
-    val serializer = typeInfo.createSerializer()
-    val typeClass = typeInfo.getTypeClass
-
-    val instance = new SerializerTestInstance[T](serializer, typeClass, -1, objects: _*)
-
-    instance.testAll()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/020b282b/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 21ccbdb..6e10510 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -128,6 +128,13 @@ under the License.
 			<artifactId>scalatest_2.10</artifactId>
 			<scope>test</scope>
 		</dependency>
+		
+		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+			<scope>test</scope>
+		</dependency>
+		
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/020b282b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
new file mode 100644
index 0000000..37e334b
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.SerializerTestInstance
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.joda.time.DateTime
+import org.junit.Test
+import scala.reflect._
+import org.joda.time.LocalDate
+import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
+import com.esotericsoftware.kryo.Serializer
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.Output
+import com.esotericsoftware.kryo.io.Input
+
+class KryoGenericTypeSerializerTest {
+
+  @Test
+  def testThrowableSerialization: Unit = {
+    val a = List(new RuntimeException("Hello"), new RuntimeException("there"))
+
+    runTests(a)
+  }
+
+  @Test
+  def jodaSerialization: Unit = {
+    val a = List(new LocalDate(1), new LocalDate(2))
+    
+    KryoSerializer.registerSerializer(classOf[LocalDate], new LocalDateSerializer())
+
+    runTests(a)
+  }
+
+  @Test
+  def testScalaListSerialization: Unit = {
+    val a = List(42,1,49,1337)
+
+    runTests(a)
+  }
+
+  @Test
+  def testScalaMutablelistSerialization: Unit = {
+    val a = scala.collection.mutable.ListBuffer(42,1,49,1337)
+
+    runTests(a)
+  }
+
+  @Test
+  def testScalaMapSerialization: Unit = {
+    val a = Map(("1" -> 1), ("2" -> 2), ("42" -> 42), ("1337" -> 1337))
+
+    runTests(Seq(a))
+  }
+
+  @Test
+  def testMutableMapSerialization: Unit ={
+    val a = scala.collection.mutable.Map((1 -> "1"), (2 -> "2"), (3 -> "3"))
+
+    runTests(Seq(a))
+  }
+
+  @Test
+  def testScalaListComplexTypeSerialization: Unit = {
+    val a = ComplexType("1234", 42, List(1,2,3,4))
+    val b = ComplexType("4321", 24, List(4,3,2,1))
+    val c = ComplexType("1337", 1, List(1))
+    val list = List(a, b, c)
+
+    runTests(list)
+  }
+
+  @Test
+  def testHeterogenousScalaList: Unit = {
+    val a = new DerivedType("foo", "bar")
+    val b = new BaseType("foobar")
+    val c = new DerivedType2("bar", "foo")
+    val list = List(a,b,c)
+
+    runTests(list)
+  }
+
+  case class ComplexType(id: String, number: Int, values: List[Int]){
+    override def equals(obj: Any): Boolean ={
+      if(obj != null && obj.isInstanceOf[ComplexType]){
+        val complexType = obj.asInstanceOf[ComplexType]
+        id.equals(complexType.id) && number.equals(complexType.number) &&
values.equals(
+          complexType.values)
+      }else{
+        false
+      }
+    }
+  }
+
+  class BaseType(val name: String){
+    override def equals(obj: Any): Boolean = {
+      if(obj != null && obj.isInstanceOf[BaseType]){
+        obj.asInstanceOf[BaseType].name.equals(name)
+      }else{
+        false
+      }
+    }
+  }
+
+  class DerivedType(name: String, val sub: String) extends BaseType(name){
+    override def equals(obj: Any): Boolean = {
+      if(obj != null && obj.isInstanceOf[DerivedType]){
+        super.equals(obj) && obj.asInstanceOf[DerivedType].sub.equals(sub)
+      }else{
+        false
+      }
+    }
+  }
+
+  class DerivedType2(name: String, val sub: String) extends BaseType(name){
+    override def equals(obj: Any): Boolean = {
+      if(obj != null && obj.isInstanceOf[DerivedType2]){
+        super.equals(obj) && obj.asInstanceOf[DerivedType2].sub.equals(sub)
+      }else{
+        false
+      }
+    }
+  }
+
+  def runTests[T : ClassTag](objects: Seq[T]): Unit ={
+    val clsTag = classTag[T]
+    val typeInfo = new GenericTypeInfo[T](clsTag.runtimeClass.asInstanceOf[Class[T]])
+    val serializer = typeInfo.createSerializer()
+    val typeClass = typeInfo.getTypeClass
+
+    val instance = new SerializerTestInstance[T](serializer, typeClass, -1, objects: _*)
+
+    instance.testAll()
+  }
+}
+
+class LocalDateSerializer extends Serializer[LocalDate] with java.io.Serializable {
+
+  override def write(kryo: Kryo, output: Output, obj: LocalDate) {
+    output.writeInt(obj.getYear());
+    output.writeInt(obj.getMonthOfYear());
+    output.writeInt(obj.getDayOfMonth());
+  }
+
+  override def read(kryo: Kryo, input: Input, typeClass: Class[LocalDate]) : LocalDate =
{
+    new LocalDate(input.readInt(), input.readInt(), input.readInt());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/020b282b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
index 0f27ffa..29e13ec 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
@@ -18,19 +18,17 @@
 package org.apache.flink.api.scala.runtime
 
 import java.util
-
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.util.StringUtils
+import org.joda.time.LocalDate
 import org.junit.Assert
 import org.junit.Test
-
 import org.apache.flink.api.scala._
-
 import scala.collection.JavaConverters._
-
 import java.util.Random
+import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
 
 class TupleSerializerTest {
 
@@ -93,6 +91,23 @@ class TupleSerializerTest {
   }
 
   @Test
+  def testTuple2StringJodaTime(): Unit = {
+    val rnd: Random = new Random(807346528946L)
+
+    val testTuples = Array(
+      (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
+      (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
+      (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
+      ("", rnd.nextDouble),
+      (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
+      (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)))
+      
+    KryoSerializer.registerSerializer(classOf[LocalDate], new LocalDateSerializer())
+    
+    runTests(testTuples)
+  }
+
+  @Test
   def testTuple2StringStringArray(): Unit = {
     val rnd: Random = new Random(289347567856686223L)
 


Mime
View raw message