Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1D880200CAF for ; Wed, 7 Jun 2017 19:22:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1C263160BBF; Wed, 7 Jun 2017 17:22:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 617E5160BD0 for ; Wed, 7 Jun 2017 19:22:49 +0200 (CEST) Received: (qmail 98868 invoked by uid 500); 7 Jun 2017 17:22:48 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 98854 invoked by uid 99); 7 Jun 2017 17:22:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jun 2017 17:22:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 652C5DFE8F; Wed, 7 Jun 2017 17:22:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tzulitai@apache.org To: commits@flink.apache.org Date: Wed, 07 Jun 2017 17:22:50 -0000 Message-Id: In-Reply-To: <7bee20c886c540fd9c56125559f1d972@git.apache.org> References: <7bee20c886c540fd9c56125559f1d972@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/12] flink git commit: [FLINK-6853] [DataStream] Let StreamRecordSerializer be compatible with MultiplexingStreamRecordSerializer archived-at: Wed, 07 Jun 2017 17:22:50 -0000 [FLINK-6853] [DataStream] Let StreamRecordSerializer be compatible with MultiplexingStreamRecordSerializer This commit lets StreamRecordSerializer.ensureCompatibility be tolerable for config snapshots taken from the legacy MultiplexingStreamRecordSerializer. This is required for users which originally used MultiplexingStreamRecordSerializer to serialize stream elements as part of their checkpointed state (e.g. FlinkCEP). This closes #4079. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d89dd06 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d89dd06 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d89dd06 Branch: refs/heads/release-1.3 Commit: 1d89dd06c1f9b09420ad3ff095d0842b4a951938 Parents: e1e207c Author: Tzu-Li (Gordon) Tai Authored: Tue Jun 6 10:41:03 2017 +0200 Committer: Tzu-Li (Gordon) Tai Committed: Wed Jun 7 18:52:00 2017 +0200 ---------------------------------------------------------------------- .../streamrecord/StreamElementSerializer.java | 41 ++++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1d89dd06/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index 390ac9d..19a69f5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; @@ -285,26 +286,34 @@ public final class StreamElementSerializer extends TypeSerializer ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + Tuple2, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig; + + // we are compatible for data written by ourselves or the legacy MultiplexingStreamRecordSerializer if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) { - Tuple2, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig = + previousTypeSerializerAndConfig = ((StreamElementSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); - - CompatibilityResult compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousTypeSerializerAndConfig.f0, - UnloadableDummyTypeSerializer.class, - previousTypeSerializerAndConfig.f1, - typeSerializer); - - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new StreamElementSerializer<>( - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); - } + } else if (configSnapshot instanceof MultiplexingStreamRecordSerializer.MultiplexingStreamRecordSerializerConfigSnapshot) { + previousTypeSerializerAndConfig = + ((MultiplexingStreamRecordSerializer.MultiplexingStreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); + } else { + return CompatibilityResult.requiresMigration(); } - return CompatibilityResult.requiresMigration(); + CompatibilityResult compatResult = CompatibilityUtil.resolveCompatibilityResult( + previousTypeSerializerAndConfig.f0, + UnloadableDummyTypeSerializer.class, + previousTypeSerializerAndConfig.f1, + typeSerializer); + + if (!compatResult.isRequiresMigration()) { + return CompatibilityResult.compatible(); + } else if (compatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new StreamElementSerializer<>( + new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); + } else { + return CompatibilityResult.requiresMigration(); + } } /**