flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject flink git commit: [FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serialization
Date Mon, 13 Mar 2017 15:43:45 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 01703e60e -> e50bf6506


[FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serialization

This commit adds a reimplemented JavaSerializer to be registered with
Kryo. This is due to a know issue with Kryo's JavaSerializer that may
use the wrong classloader for deserialzation.

Instead of registering Kryo's JavaSerializer for Throwables, it is now
changed to register the reimplemented JavaSerializer. Users who bump
into ClassNotFoundExceptions if they are using Kryo's JavaSerializer for
their own types are also recommended to change to Flink's
JavaSerializer.

This closes #3519.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e50bf650
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e50bf650
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e50bf650

Branch: refs/heads/release-1.1
Commit: e50bf65065722ea02a40406c2c65eed3d65a814a
Parents: 01703e6
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Sun Mar 12 22:46:27 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Mon Mar 13 23:42:38 2017 +0800

----------------------------------------------------------------------
 docs/apis/best_practices.md                     | 13 ++++
 .../typeutils/runtime/kryo/JavaSerializer.java  | 82 ++++++++++++++++++++
 .../typeutils/runtime/kryo/KryoSerializer.java  |  5 +-
 .../apache/flink/util/InstantiationUtil.java    |  4 +-
 4 files changed, 99 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e50bf650/docs/apis/best_practices.md
----------------------------------------------------------------------
diff --git a/docs/apis/best_practices.md b/docs/apis/best_practices.md
index 7ae1b64..7e0c3e4 100644
--- a/docs/apis/best_practices.md
+++ b/docs/apis/best_practices.md
@@ -270,6 +270,19 @@ For Google Protobuf you need the following Maven dependency:
 
 Please adjust the versions of both libraries as needed.
 
+### Issue with using Kryo's `JavaSerializer` 
+  
+If you register Kryo's `JavaSerializer` for your custom type, you may
+encounter `ClassNotFoundException`s even though your custom type class is
+included in the submitted user code jar. This is due to a know issue with
+Kryo's `JavaSerializer`, which may incorrectly use the wrong classloader.
+
+In this case, you should use `org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer`
+instead to resolve the issue. This is a reimplemented `JavaSerializer` in Flink
+that makes sure the user code classloader is used.
+
+Please refer to [FLINK-6025](https://issues.apache.org/jira/browse/FLINK-6025)
+for more details.
 
 ## Using Logback instead of Log4j
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e50bf650/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
new file mode 100644
index 0000000..a51647c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.util.ObjectMap;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * This is a reimplementation of Kryo's {@link com.esotericsoftware.kryo.serializers.JavaSerializer},
+ * that additionally makes sure the {@link ObjectInputStream} used for deserialization specifically
uses Kryo's
+ * registered classloader.
+ *
+ * Flink maintains this reimplementation due to a known issue with Kryo's {@code JavaSerializer},
in which the wrong
+ * classloader may be used for deserialization, leading to {@link ClassNotFoundException}s.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/FLINK-6025">FLINK-6025</a>
+ * @see <a href="https://github.com/EsotericSoftware/kryo/pull/483">Known issue with
Kryo's JavaSerializer</a>
+ *
+ * @param <T> The type to be serialized.
+ */
+public class JavaSerializer<T> extends Serializer<T> {
+
+	public JavaSerializer() {}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void write(Kryo kryo, Output output, T o) {
+		try {
+			ObjectMap graphContext = kryo.getGraphContext();
+			ObjectOutputStream objectStream = (ObjectOutputStream)graphContext.get(this);
+			if (objectStream == null) {
+				objectStream = new ObjectOutputStream(output);
+				graphContext.put(this, objectStream);
+			}
+			objectStream.writeObject(o);
+			objectStream.flush();
+		} catch (Exception ex) {
+			throw new KryoException("Error during Java serialization.", ex);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public T read(Kryo kryo, Input input, Class aClass) {
+		try {
+			ObjectMap graphContext = kryo.getGraphContext();
+			ObjectInputStream objectStream = (ObjectInputStream)graphContext.get(this);
+			if (objectStream == null) {
+				// make sure we use Kryo's classloader
+				objectStream = new InstantiationUtil.ClassLoaderObjectInputStream(input, kryo.getClassLoader());
+				graphContext.put(this, objectStream);
+			}
+			return (T) objectStream.readObject();
+		} catch (Exception ex) {
+			throw new KryoException("Error during Java deserialization.", ex);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e50bf650/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index e74e251..44c952a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -24,7 +24,6 @@ import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
 
 import org.apache.avro.generic.GenericData;
 
@@ -130,7 +129,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
 	@Override
 	public KryoSerializer<T> duplicate() {
-		return new KryoSerializer<T>(this);
+		return new KryoSerializer<>(this);
 	}
 
 	@Override
@@ -331,6 +330,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 			kryo.setReferences(true);
 			
 			// Throwable and all subclasses should be serialized via java serialization
+			// Note: the registered JavaSerializer is Flink's own implementation, and not Kryo's.
+			//       This is due to a know issue with Kryo's JavaSerializer. See FLINK-6025 for details.
 			kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
 
 			// Add default serializers first, so that they type registrations without a serializer

http://git-wip-us.apache.org/repos/asf/flink/blob/e50bf650/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index bcf71f0..70464e7 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -44,9 +44,7 @@ import java.util.HashMap;
 public final class InstantiationUtil {
 	
 	/**
-	 * A custom ObjectInputStream that can also load user-code using a
-	 * user-code ClassLoader.
-	 *
+	 * A custom ObjectInputStream that can load classes using a specific ClassLoader.
 	 */
 	public static class ClassLoaderObjectInputStream extends ObjectInputStream {
 


Mime
View raw message