flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/4] flink git commit: [FLINK-1399] Add support for registering Serializers with Kryo
Date Sun, 18 Jan 2015 23:29:16 GMT
Repository: flink
Updated Branches:
  refs/heads/master d3072ba57 -> 29c54a20d


[FLINK-1399] Add support for registering Serializers with Kryo

This closes #305


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d4ed15e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d4ed15e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d4ed15e

Branch: refs/heads/master
Commit: 2d4ed15e8d2aa1a1fcd7001fccea2ee70cc8a8b0
Parents: d3072ba
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Jan 13 16:13:55 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Jan 18 13:26:04 2015 -0800

----------------------------------------------------------------------
 .../flink/api/java/ExecutionEnvironment.java    | 18 +++++++++
 .../java/typeutils/runtime/KryoSerializer.java  | 42 +++++++++++++++++++-
 .../flink/api/scala/ExecutionEnvironment.scala  | 18 +++++++++
 3 files changed, 76 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d4ed15e/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index f7057ef..1753af8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
+import com.esotericsoftware.kryo.Serializer;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -54,6 +55,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
@@ -198,6 +200,22 @@ public abstract class ExecutionEnvironment {
 	public String getIdString() {
 		return this.executionId.toString();
 	}
+
+	/**
+	 * Registers the given Serializer as a default serializer for the given class at the
+	 * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}.
+	 */
+	public void addDefaultKryoSerializer(Class<?> clazz, Serializer<?> serializer)
{
+		KryoSerializer.addDefaultSerializer(clazz, serializer);
+	}
+
+	/**
+	 * Registers the given Serializer as a default serializer for the given class at the
+	 * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}.
+	 */
+	public void addDefaultKryoSerializer(Class<?> clazz, Class<? extends Serializer<?>>
serializer) {
+		KryoSerializer.addDefaultSerializer(clazz, serializer);
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//  Data set creations

http://git-wip-us.apache.org/repos/asf/flink/blob/2d4ed15e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index 0a88606..0a5c4d9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
@@ -33,9 +34,17 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 public class KryoSerializer<T> extends TypeSerializer<T> {
-	private static final long serialVersionUID = 2L;
+	private static final long serialVersionUID = 3L;
+
+	private static Map<Class<?>, Serializer<?>> staticRegisteredSerializers
= new HashMap<Class<?>, Serializer<?>>();
+	private static Map<Class<?>, Class<? extends Serializer<?>>> staticRegisteredSerializersClasses
= new HashMap<Class<?>, Class<? extends Serializer<?>>>();
+
+	private static Map<Class<?>, Serializer<?>> registeredSerializers;
+	private static Map<Class<?>, Class<? extends Serializer<?>>> registeredSerializersClasses;
 
 	private final Class<T> type;
 
@@ -53,6 +62,9 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 			throw new NullPointerException("Type class cannot be null.");
 		}
 		this.type = type;
+
+		this.registeredSerializers = staticRegisteredSerializers;
+		this.registeredSerializersClasses = staticRegisteredSerializersClasses;
 	}
 
 
@@ -182,12 +194,38 @@ public class KryoSerializer<T> extends TypeSerializer<T>
{
 		if (this.kryo == null) {
 			this.kryo = new ScalaKryoInstantiator().newKryo();
 			this.kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
+
+			for (Map.Entry<Class<?>, Serializer<?>> e: registeredSerializers.entrySet())
{
+				this.kryo.addDefaultSerializer(e.getKey(), e.getValue());
+			}
+
+			for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e: registeredSerializersClasses.entrySet())
{
+				this.kryo.addDefaultSerializer(e.getKey(), e.getValue());
+			}
+
 			this.kryo.setRegistrationRequired(false);
 			this.kryo.register(type);
+
 			this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
 		}
 	}
-	
+
+	/**
+	 * Registers the given Serializer as a default serializer for the given class at the Kryo
+	 * instance.
+	 */
+	public static void addDefaultSerializer(Class<?> clazz, Serializer<?> serializer)
{
+		staticRegisteredSerializers.put(clazz, serializer);
+	}
+
+	/**
+	 * Registers the given Serializer as a default serializer for the given class at the Kryo
+	 * instance.
+	 */
+	public static void addDefaultSerializer(Class<?> clazz, Class<? extends Serializer<?>>
serializer) {
+		staticRegisteredSerializersClasses.put(clazz, serializer);
+	}
+
 	// --------------------------------------------------------------------------------------------
 	// for testing
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2d4ed15e/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index db2e864..565b9b1 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -19,11 +19,13 @@ package org.apache.flink.api.scala
 
 import java.util.UUID
 
+import com.esotericsoftware.kryo.Serializer
 import org.apache.commons.lang3.Validate
 import org.apache.flink.api.common.{ExecutionConfig, JobExecutionResult}
 import org.apache.flink.api.java.io._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
 import org.apache.flink.api.java.typeutils.{ValueTypeInfo, TupleTypeInfoBase}
 import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
 import org.apache.flink.core.fs.Path
@@ -122,6 +124,22 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
+   * Registers the given Serializer as a default serializer for the given class at the
+   * [[KryoSerializer]].
+   */
+  def addDefaultKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
+    javaEnv.addDefaultKryoSerializer(clazz, serializer)
+  }
+
+  /**
+   * Registers the given Serializer as a default serializer for the given class at the
+   * [[KryoSerializer]]
+   */
+  def addDefaultKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]])
{
+    javaEnv.addDefaultKryoSerializer(clazz, serializer)
+  }
+
+  /**
    * Creates a DataSet of Strings produced by reading the given file line wise.
    *
    * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or


Mime
View raw message