flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [03/12] flink git commit: [FLINK-6853] [DataStream] Let StreamRecordSerializer be compatible with MultiplexingStreamRecordSerializer
Date Wed, 07 Jun 2017 17:22:50 GMT
[FLINK-6853] [DataStream] Let StreamRecordSerializer be compatible with MultiplexingStreamRecordSerializer

This commit lets StreamRecordSerializer.ensureCompatibility be tolerable
for config snapshots taken from the legacy
MultiplexingStreamRecordSerializer. This is required for users which
originally used MultiplexingStreamRecordSerializer to serialize stream
elements as part of their checkpointed state (e.g. FlinkCEP).

This closes #4079.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d89dd06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d89dd06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d89dd06

Branch: refs/heads/release-1.3
Commit: 1d89dd06c1f9b09420ad3ff095d0842b4a951938
Parents: e1e207c
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Tue Jun 6 10:41:03 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Wed Jun 7 18:52:00 2017 +0200

----------------------------------------------------------------------
 .../streamrecord/StreamElementSerializer.java   | 41 ++++++++++++--------
 1 file changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1d89dd06/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index 390ac9d..19a69f5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 
@@ -285,26 +286,34 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 
 	@Override
 	public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot
configSnapshot) {
+		Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig;
+
+		// we are compatible for data written by ourselves or the legacy MultiplexingStreamRecordSerializer
 		if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) {
-			Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig
=
+			previousTypeSerializerAndConfig =
 				((StreamElementSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
-
-			CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
-					previousTypeSerializerAndConfig.f0,
-					UnloadableDummyTypeSerializer.class,
-					previousTypeSerializerAndConfig.f1,
-					typeSerializer);
-
-			if (!compatResult.isRequiresMigration()) {
-				return CompatibilityResult.compatible();
-			} else if (compatResult.getConvertDeserializer() != null) {
-				return CompatibilityResult.requiresMigration(
-					new StreamElementSerializer<>(
-						new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
-			}
+		} else if (configSnapshot instanceof MultiplexingStreamRecordSerializer.MultiplexingStreamRecordSerializerConfigSnapshot)
{
+			previousTypeSerializerAndConfig =
+				((MultiplexingStreamRecordSerializer.MultiplexingStreamRecordSerializerConfigSnapshot)
configSnapshot).getSingleNestedSerializerAndConfig();
+		} else {
+			return CompatibilityResult.requiresMigration();
 		}
 
-		return CompatibilityResult.requiresMigration();
+		CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
+				previousTypeSerializerAndConfig.f0,
+				UnloadableDummyTypeSerializer.class,
+				previousTypeSerializerAndConfig.f1,
+				typeSerializer);
+
+		if (!compatResult.isRequiresMigration()) {
+			return CompatibilityResult.compatible();
+		} else if (compatResult.getConvertDeserializer() != null) {
+			return CompatibilityResult.requiresMigration(
+				new StreamElementSerializer<>(
+					new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
+		} else {
+			return CompatibilityResult.requiresMigration();
+		}
 	}
 
 	/**


Mime
View raw message