flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [05/15] flink git commit: [FLINK-6883] [core] Refactor TypeSerializer to not implement TypeDeserializer
Date Tue, 13 Jun 2017 05:49:02 GMT
[FLINK-6883] [core] Refactor TypeSerializer to not implement TypeDeserializer

The separation of the TypeDeserializer interface from the TypeSerializer
base class is due to the fact that additionally implementing the
TypeDeserializer interface alters the generation order of anonymos
serializer classes for Scala case classes and collections.

Instead, the TypeDeserializer is now used as a mixin on the
TypeDeserializerAdapter utility, which now serves as a bridge for
both directions (i.e. TypeSerializer to TypeDeserializer, and vice
versa). No user interfaces are broken due to this change.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39c8270d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39c8270d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39c8270d

Branch: refs/heads/release-1.3
Commit: 39c8270d39684765484fa4b6b2711e5714b81b64
Parents: 57421f9
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Sun Jun 11 15:30:36 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Jun 13 07:20:53 2017 +0200

----------------------------------------------------------------------
 .../common/typeutils/CompatibilityResult.java   | 24 +++++++++++-
 .../typeutils/TypeDeserializerAdapter.java      | 40 ++++++++++++++------
 .../api/common/typeutils/TypeSerializer.java    |  2 +-
 3 files changed, 51 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39c8270d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
index 4c83ded..1e05d57 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
@@ -60,6 +60,8 @@ public final class CompatibilityResult<T> {
 	 * @param convertDeserializer the convert deserializer to use, in the case that the preceding
serializer
 	 *                            cannot be found.
 	 *
+	 * @param <T> the type of the data being migrated.
+	 *
 	 * @return a result that signals migration is necessary, also providing a convert deserializer.
 	 */
 	public static <T> CompatibilityResult<T> requiresMigration(@Nonnull TypeDeserializer<T>
convertDeserializer) {
@@ -69,11 +71,29 @@ public final class CompatibilityResult<T> {
 	}
 
 	/**
+	 * Returns a result that signals migration to be performed, and in the case that the preceding
serializer
+	 * cannot be found or restored to read the previous data during migration, a provided convert
serializer
+	 * can be used. The provided serializer will only be used for deserialization.
+	 *
+	 * @param convertSerializer the convert serializer to use, in the case that the preceding
serializer
+	 *                          cannot be found. The provided serializer will only be used for
deserialization.
+	 *
+	 * @param <T> the type of the data being migrated.
+	 *
+	 * @return a result that signals migration is necessary, also providing a convert serializer.
+	 */
+	public static <T> CompatibilityResult<T> requiresMigration(@Nonnull TypeSerializer<T>
convertSerializer) {
+		Preconditions.checkNotNull(convertSerializer, "Convert serializer cannot be null.");
+
+		return new CompatibilityResult<>(true, new TypeDeserializerAdapter<>(convertSerializer));
+	}
+
+	/**
 	 * Returns a result that signals migration to be performed. The migration will fail if the
preceding
 	 * serializer for the previous data cannot be found.
 	 *
-	 * <p>You can also provide a convert deserializer using {@link #requiresMigration(TypeDeserializer)},
-	 * which will be used as a fallback resort in such cases.
+	 * <p>You can also provide a convert deserializer using {@link #requiresMigration(TypeDeserializer)}
+	 * or {@link #requiresMigration(TypeSerializer)}, which will be used as a fallback resort
in such cases.
 	 *
 	 * @return a result that signals migration is necessary, without providing a convert deserializer.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/39c8270d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
index e02bed4..fb59602 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
@@ -25,27 +26,42 @@ import org.apache.flink.util.Preconditions;
 import java.io.IOException;
 
 /**
- * A utility class that wraps a {@link TypeDeserializer} as a {@link TypeSerializer}.
+ * A utility class that is used to bridge a {@link TypeSerializer} and {@link TypeDeserializer}.
+ * It either wraps a type deserializer or serializer, and can only ever be used for deserialization
+ * (i.e. only read-related methods is functional).
  *
- * <p>Methods related to deserialization are directly forwarded to the wrapped deserializer,
+ * <p>Methods related to deserialization are directly forwarded to the wrapped deserializer
or serializer,
  * while serialization methods are masked and not intended for use.
  *
  * @param <T> The data type that the deserializer deserializes.
  */
-public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> {
+@Internal
+public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> implements
TypeDeserializer<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	/** The actual wrapped deserializer instance */
+	/** The actual wrapped deserializer or serializer instance */
 	private final TypeDeserializer<T> deserializer;
+	private final TypeSerializer<T> serializer;
 
 	/**
-	 * Creates a {@link TypeSerializer} that wraps a {@link TypeDeserializer}.
+	 * Creates a {@link TypeDeserializerAdapter} that wraps a {@link TypeDeserializer}.
 	 *
 	 * @param deserializer the actual deserializer to wrap.
 	 */
 	public TypeDeserializerAdapter(TypeDeserializer<T> deserializer) {
 		this.deserializer = Preconditions.checkNotNull(deserializer);
+		this.serializer = null;
+	}
+
+	/**
+	 * Creates a {@link TypeDeserializerAdapter} that wraps a {@link TypeSerializer}.
+	 *
+	 * @param serializer the actual serializer to wrap.
+	 */
+	public TypeDeserializerAdapter(TypeSerializer<T> serializer) {
+		this.deserializer = null;
+		this.serializer = Preconditions.checkNotNull(serializer);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -53,31 +69,31 @@ public final class TypeDeserializerAdapter<T> extends TypeSerializer<T>
{
 	// --------------------------------------------------------------------------------------------
 
 	public T deserialize(DataInputView source) throws IOException {
-		return deserializer.deserialize(source);
+		return (deserializer != null) ? deserializer.deserialize(source) : serializer.deserialize(source);
 	}
 
 	public T deserialize(T reuse, DataInputView source) throws IOException {
-		return deserializer.deserialize(reuse, source);
+		return (deserializer != null) ? deserializer.deserialize(reuse, source) : serializer.deserialize(reuse,
source);
 	}
 
 	public TypeSerializer<T> duplicate() {
-		return deserializer.duplicate();
+		return (deserializer != null) ? deserializer.duplicate() : serializer.duplicate();
 	}
 
 	public int getLength() {
-		return deserializer.getLength();
+		return (deserializer != null) ? deserializer.getLength() : serializer.getLength();
 	}
 
 	public boolean equals(Object obj) {
-		return deserializer.equals(obj);
+		return (deserializer != null) ? deserializer.equals(obj) : serializer.equals(obj);
 	}
 
 	public boolean canEqual(Object obj) {
-		return deserializer.canEqual(obj);
+		return (deserializer != null) ? deserializer.canEqual(obj) : serializer.canEqual(obj);
 	}
 
 	public int hashCode() {
-		return deserializer.hashCode();
+		return (deserializer != null) ? deserializer.hashCode() : serializer.hashCode();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/39c8270d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 85cbfdb..a606a18 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -36,7 +36,7 @@ import java.io.Serializable;
  * @param <T> The data type that the serializer serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializer<T> implements TypeDeserializer<T>, Serializable
{
+public abstract class TypeSerializer<T> implements Serializable {
 	
 	private static final long serialVersionUID = 1L;
 


Mime
View raw message