flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/3] flink git commit: [FLINK-1378] Add support for Throwables in KryoSerializer
Date Sun, 11 Jan 2015 20:45:20 GMT
[FLINK-1378] Add support for Throwables in KryoSerializer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8af6ef49
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8af6ef49
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8af6ef49

Branch: refs/heads/master
Commit: 8af6ef49fd84c309aeba0a3c8963ac83e0243c59
Parents: 935e316
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Jan 9 19:04:23 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Jan 11 21:09:53 2015 +0100

----------------------------------------------------------------------
 .../flink/api/common/typeutils/SerializerTestBase.java |  3 +++
 .../api/java/typeutils/runtime/KryoSerializer.java     |  4 +++-
 .../scala/runtime/KryoGenericTypeSerializerTest.scala  | 13 ++++++++++---
 3 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8af6ef49/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 5122af9..4835b4f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -384,6 +384,9 @@ public abstract class SerializerTestBase<T> {
 				assertArrayEquals(message, (Object[]) should, (Object[]) is);
 			}
 		}
+		else if (should instanceof Throwable) {
+			assertEquals(((Throwable)should).getMessage(), ((Throwable)is).getMessage());
+		}
 		else {
 			assertEquals(message,  should, is);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8af6ef49/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index b2c55fb..0a88606 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -22,6 +22,7 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.twitter.chill.ScalaKryoInstantiator;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -114,7 +115,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 			output = new Output(outputStream);
 			previousOut = target;
 		}
-		
+
 		try {
 			kryo.writeClassAndObject(output, record);
 			output.flush();
@@ -180,6 +181,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	private void checkKryoInitialized() {
 		if (this.kryo == null) {
 			this.kryo = new ScalaKryoInstantiator().newKryo();
+			this.kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
 			this.kryo.setRegistrationRequired(false);
 			this.kryo.register(type);
 			this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());

http://git-wip-us.apache.org/repos/asf/flink/blob/8af6ef49/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
index cadd7ff..6f27c16 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -26,6 +26,13 @@ import scala.reflect._
 class KryoGenericTypeSerializerTest {
 
   @Test
+  def testThrowableSerialization: Unit = {
+    val a = List(new RuntimeException("Hello"), new RuntimeException("there"))
+
+    runTests(a)
+  }
+
+  @Test
   def testScalaListSerialization: Unit = {
     val a = List(42,1,49,1337)
 
@@ -43,14 +50,14 @@ class KryoGenericTypeSerializerTest {
   def testScalaMapSerialization: Unit = {
     val a = Map(("1" -> 1), ("2" -> 2), ("42" -> 42), ("1337" -> 1337))
 
-    runTests(a)
+    runTests(Seq(a))
   }
 
   @Test
   def testMutableMapSerialization: Unit ={
     val a = scala.collection.mutable.Map((1 -> "1"), (2 -> "2"), (3 -> "3"))
 
-    runTests(a)
+    runTests(Seq(a))
   }
 
   @Test
@@ -115,7 +122,7 @@ class KryoGenericTypeSerializerTest {
     }
   }
 
-  def runTests[T : ClassTag](objects: T *): Unit ={
+  def runTests[T : ClassTag](objects: Seq[T]): Unit ={
     val clsTag = classTag[T]
     val typeInfo = new GenericTypeInfo[T](clsTag.runtimeClass.asInstanceOf[Class[T]])
     val serializer = typeInfo.createSerializer()


Mime
View raw message