flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [2/8] flink git commit: [FLINK-6178] [core] Allow serializer upgrades for managed state
Date Sun, 07 May 2017 19:47:00 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 7152bfc..96025fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -43,6 +43,8 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -71,7 +73,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -642,6 +643,139 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		state.value();
 	}
 
+	@Test
+	public void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+
+		// register A first then B
+		env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
+		env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class);
+
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		// ============== create snapshot of current configuration ==============
+
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.update(new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));
+
+		backend.setCurrentKey(2);
+		state.update(new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));
+
+		KeyedStateHandle snapshot = runSnapshot(backend.snapshot(
+			682375462378L,
+			2,
+			streamFactory,
+			CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ========== restore snapshot, with a different registration order in the configuration ==========
+
+		env = new DummyEnvironment("test", 1, 0);
+
+		env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class); // this time register B first
+		env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
+
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+
+		snapshot.discardState();
+
+		// re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
+		// initializeSerializerUnlessSet would not pick up our new config
+		kvId = new ValueStateDescriptor<>("id", pojoType);
+		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+
+		// update to test state backends that eagerly serialize, such as RocksDB
+		state.update(new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));
+
+		// this tests backends that lazily serialize, such as memory state backend
+		runSnapshot(backend.snapshot(
+			682375462378L,
+			2,
+			streamFactory,
+			CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+	}
+
+	@Test
+	public void testPojoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+
+		// register A first then B
+		env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
+		env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class);
+
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		TypeInformation<TestPojo> pojoType = TypeExtractor.getForClass(TestPojo.class);
+
+		// make sure that we are in fact using the PojoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof PojoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		// ============== create snapshot of current configuration ==============
+
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.update(new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));
+
+		backend.setCurrentKey(2);
+		state.update(new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));
+
+		KeyedStateHandle snapshot = runSnapshot(backend.snapshot(
+			682375462378L,
+			2,
+			streamFactory,
+			CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ========== restore snapshot, with a different registration order in the configuration ==========
+
+		env = new DummyEnvironment("test", 1, 0);
+
+		env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class); // this time register B first
+		env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
+
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+
+		snapshot.discardState();
+
+		// re-initialize to ensure that we create the PojoSerializer from scratch, otherwise
+		// initializeSerializerUnlessSet would not pick up our new config
+		kvId = new ValueStateDescriptor<>("id", pojoType);
+		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+
+		// update to test state backends that eagerly serialize, such as RocksDB
+		state.update(new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));
+
+		// this tests backends that lazily serialize, such as memory state backend
+		runSnapshot(backend.snapshot(
+			682375462378L,
+			2,
+			streamFactory,
+			CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+	}
 
 	@Test
 	@SuppressWarnings("unchecked")
@@ -1696,8 +1830,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.value();
 
 				fail("should recognize wrong serializers");
-			} catch (IOException e) {
-				if (!e.getMessage().contains("Trying to access state using wrong")) {
+			} catch (RuntimeException e) {
+				if (!e.getMessage().contains("State migration currently isn't supported")) {
 					fail("wrong exception " + e);
 				}
 				// expected
@@ -1747,8 +1881,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.get();
 
 				fail("should recognize wrong serializers");
-			} catch (IOException e) {
-				if (!e.getMessage().contains("Trying to access state using wrong")) {
+			} catch (RuntimeException e) {
+				if (!e.getMessage().contains("State migration currently isn't supported")) {
 					fail("wrong exception " + e);
 				}
 				// expected
@@ -1800,8 +1934,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.get();
 
 				fail("should recognize wrong serializers");
-			} catch (IOException e) {
-				if (!e.getMessage().contains("Trying to access state using wrong ")) {
+			} catch (RuntimeException e) {
+				if (!e.getMessage().contains("State migration currently isn't supported")) {
 					fail("wrong exception " + e);
 				}
 				// expected
@@ -1851,8 +1985,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.entries();
 
 				fail("should recognize wrong serializers");
-			} catch (IOException e) {
-				if (!e.getMessage().contains("Trying to access state using wrong ")) {
+			} catch (RuntimeException e) {
+				if (!e.getMessage().contains("State migration currently isn't supported")) {
 					fail("wrong exception " + e);
 				}
 				// expected
@@ -2382,15 +2516,27 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		return snapshotRunnableFuture.get();
 	}
 
-	private static class TestPojo implements Serializable {
+	public static class TestPojo implements Serializable {
 		private String strField;
 		private Integer intField;
 
+		private TestNestedPojoClassA kryoClassAField;
+		private TestNestedPojoClassB kryoClassBField;
+
 		public TestPojo() {}
 
 		public TestPojo(String strField, Integer intField) {
 			this.strField = strField;
 			this.intField = intField;
+			this.kryoClassAField = null;
+			this.kryoClassBField = null;
+		}
+
+		public TestPojo(String strField, Integer intField, TestNestedPojoClassA classAField, TestNestedPojoClassB classBfield) {
+			this.strField = strField;
+			this.intField = intField;
+			this.kryoClassAField = classAField;
+			this.kryoClassBField = classBfield;
 		}
 
 		public String getStrField() {
@@ -2409,6 +2555,22 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			this.intField = intField;
 		}
 
+		public TestNestedPojoClassA getKryoClassAField() {
+			return kryoClassAField;
+		}
+
+		public void setKryoClassAField(TestNestedPojoClassA kryoClassAField) {
+			this.kryoClassAField = kryoClassAField;
+		}
+
+		public TestNestedPojoClassB getKryoClassBField() {
+			return kryoClassBField;
+		}
+
+		public void setKryoClassBField(TestNestedPojoClassB kryoClassBField) {
+			this.kryoClassBField = kryoClassBField;
+		}
+
 		@Override
 		public String toString() {
 			return "TestPojo{" +
@@ -2424,14 +2586,133 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			TestPojo testPojo = (TestPojo) o;
 
-			if (!strField.equals(testPojo.strField)) return false;
-			return intField.equals(testPojo.intField);
+			return strField.equals(testPojo.strField)
+				&& intField.equals(testPojo.intField)
+				&& ((kryoClassAField == null && testPojo.kryoClassAField == null) || kryoClassAField.equals(testPojo.kryoClassAField))
+				&& ((kryoClassBField == null && testPojo.kryoClassBField == null) || kryoClassBField.equals(testPojo.kryoClassBField));
 		}
 
 		@Override
 		public int hashCode() {
 			int result = strField.hashCode();
 			result = 31 * result + intField.hashCode();
+
+			if (kryoClassAField != null) {
+				result = 31 * result + kryoClassAField.hashCode();
+			}
+
+			if (kryoClassBField != null) {
+				result = 31 * result + kryoClassBField.hashCode();
+			}
+
+			return result;
+		}
+	}
+
+	public static class TestNestedPojoClassA implements Serializable {
+		private Double doubleField;
+		private Integer intField;
+
+		public TestNestedPojoClassA() {}
+
+		public TestNestedPojoClassA(Double doubleField, Integer intField) {
+			this.doubleField = doubleField;
+			this.intField = intField;
+		}
+
+		public Double getDoubleField() {
+			return doubleField;
+		}
+
+		public void setDoubleField(Double doubleField) {
+			this.doubleField = doubleField;
+		}
+
+		public Integer getIntField() {
+			return intField;
+		}
+
+		public void setIntField(Integer intField) {
+			this.intField = intField;
+		}
+
+		@Override
+		public String toString() {
+			return "TestNestedPojoClassA{" +
+				"doubleField='" + doubleField + '\'' +
+				", intField=" + intField +
+				'}';
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) return true;
+			if (o == null || getClass() != o.getClass()) return false;
+
+			TestNestedPojoClassA testNestedPojoClassA = (TestNestedPojoClassA) o;
+
+			if (!doubleField.equals(testNestedPojoClassA.doubleField)) return false;
+			return intField.equals(testNestedPojoClassA.intField);
+		}
+
+		@Override
+		public int hashCode() {
+			int result = doubleField.hashCode();
+			result = 31 * result + intField.hashCode();
+			return result;
+		}
+	}
+
+	public static class TestNestedPojoClassB implements Serializable {
+		private Double doubleField;
+		private String strField;
+
+		public TestNestedPojoClassB() {}
+
+		public TestNestedPojoClassB(Double doubleField, String strField) {
+			this.doubleField = doubleField;
+			this.strField = strField;
+		}
+
+		public Double getDoubleField() {
+			return doubleField;
+		}
+
+		public void setDoubleField(Double doubleField) {
+			this.doubleField = doubleField;
+		}
+
+		public String getStrField() {
+			return strField;
+		}
+
+		public void setStrField(String strField) {
+			this.strField = strField;
+		}
+
+		@Override
+		public String toString() {
+			return "TestNestedPojoClassB{" +
+				"doubleField='" + doubleField + '\'' +
+				", strField=" + strField +
+				'}';
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) return true;
+			if (o == null || getClass() != o.getClass()) return false;
+
+			TestNestedPojoClassB testNestedPojoClassB = (TestNestedPojoClassB) o;
+
+			if (!doubleField.equals(testNestedPojoClassB.doubleField)) return false;
+			return strField.equals(testNestedPojoClassB.strField);
+		}
+
+		@Override
+		public int hashCode() {
+			int result = doubleField.hashCode();
+			result = 31 * result + strField.hashCode();
 			return result;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
index 976b9aa..8289821 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -29,7 +31,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
@@ -50,8 +52,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 	 */
 	@Test
 	public void testPutGetRemoveContainsTransform() throws Exception {
-		RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
-				new RegisteredBackendStateMetaInfo<>(
+		RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+				new RegisteredKeyedBackendStateMetaInfo<>(
 						StateDescriptor.Type.UNKNOWN,
 						"test",
 						IntSerializer.INSTANCE,
@@ -122,8 +124,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 	 */
 	@Test
 	public void testIncrementalRehash() {
-		RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
-				new RegisteredBackendStateMetaInfo<>(
+		RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+				new RegisteredKeyedBackendStateMetaInfo<>(
 						StateDescriptor.Type.UNKNOWN,
 						"test",
 						IntSerializer.INSTANCE,
@@ -167,8 +169,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 	@Test
 	public void testRandomModificationsAndCopyOnWriteIsolation() throws Exception {
 
-		final RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
-				new RegisteredBackendStateMetaInfo<>(
+		final RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+				new RegisteredKeyedBackendStateMetaInfo<>(
 						StateDescriptor.Type.UNKNOWN,
 						"test",
 						IntSerializer.INSTANCE,
@@ -322,8 +324,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 	 */
 	@Test
 	public void testCopyOnWriteContracts() {
-		RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
-				new RegisteredBackendStateMetaInfo<>(
+		RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+				new RegisteredKeyedBackendStateMetaInfo<>(
 						StateDescriptor.Type.UNKNOWN,
 						"test",
 						IntSerializer.INSTANCE,
@@ -397,8 +399,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 		final TestDuplicateSerializer stateSerializer = new TestDuplicateSerializer();;
 		final TestDuplicateSerializer keySerializer = new TestDuplicateSerializer();;
 
-		RegisteredBackendStateMetaInfo<Integer, Integer> metaInfo =
-			new RegisteredBackendStateMetaInfo<>(
+		RegisteredKeyedBackendStateMetaInfo<Integer, Integer> metaInfo =
+			new RegisteredKeyedBackendStateMetaInfo<>(
 				StateDescriptor.Type.VALUE,
 				"test",
 				namespaceSerializer,
@@ -649,5 +651,15 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 		public void disable() {
 			this.disabled = true;
 		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			throw new UnsupportedOperationException();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
index 6fd94f7..85bc177 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -44,8 +44,8 @@ public class StateTableSnapshotCompatibilityTest {
 	@Test
 	public void checkCompatibleSerializationFormats() throws IOException {
 		final Random r = new Random(42);
-		RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
-				new RegisteredBackendStateMetaInfo<>(
+		RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+				new RegisteredKeyedBackendStateMetaInfo<>(
 						StateDescriptor.Type.UNKNOWN,
 						"test",
 						IntSerializer.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
index 146ccd0..ce23f30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.testutils.recordutils;
 
 import java.io.IOException;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Record;
@@ -141,4 +143,14 @@ public final class RecordSerializer extends TypeSerializer<Record> {
 	public int hashCode() {
 		return RecordSerializer.class.hashCode();
 	}
+
+	@Override
+	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CompatibilityResult<Record> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		throw new UnsupportedOperationException();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 6871159..468fddc 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -18,8 +18,9 @@
 package org.apache.flink.api.scala.typeutils
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot}
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializerConfigSnapshot
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 /**
  * Serializer for [[Either]].
@@ -104,4 +105,47 @@ class EitherSerializer[A, B, T <: Either[A, B]](
   override def hashCode(): Int = {
     31 * leftSerializer.hashCode() + rightSerializer.hashCode()
   }
+
+  // --------------------------------------------------------------------------------------------
+  // Serializer configuration snapshotting & compatibility
+  // --------------------------------------------------------------------------------------------
+
+  override def snapshotConfiguration(): EitherSerializerConfigSnapshot = {
+    new EitherSerializerConfigSnapshot(
+      leftSerializer.snapshotConfiguration(),
+      rightSerializer.snapshotConfiguration())
+  }
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
+
+    configSnapshot match {
+      case eitherSerializerConfig: EitherSerializerConfigSnapshot =>
+        val leftRightConfigs =
+          eitherSerializerConfig.getNestedSerializerConfigSnapshots
+
+        val leftCompatResult = leftSerializer.ensureCompatibility(leftRightConfigs(0))
+        val rightCompatResult = rightSerializer.ensureCompatibility(leftRightConfigs(1))
+
+        if (leftCompatResult.requiresMigration || rightCompatResult.requiresMigration) {
+          if (leftCompatResult.getConvertDeserializer != null
+              && rightCompatResult.getConvertDeserializer != null) {
+
+            CompatibilityResult.requiresMigration(
+              new EitherSerializer[A, B, T](
+                leftCompatResult.getConvertDeserializer,
+                rightCompatResult.getConvertDeserializer
+              )
+            )
+
+          } else {
+            CompatibilityResult.requiresMigration(null)
+          }
+        } else {
+          CompatibilityResult.compatible()
+        }
+
+      case _ => CompatibilityResult.requiresMigration(null)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index 67c1445..dc96c98 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -17,10 +17,14 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import java.io.IOException
+
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot}
 import org.apache.flink.api.common.typeutils.base.IntSerializer
+import org.apache.flink.api.java.typeutils.runtime.{DataInputViewStream, DataOutputViewStream}
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.util.InstantiationUtil
 
 /**
  * Serializer for [[Enumeration]] values.
@@ -67,4 +71,114 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[
   override def canEqual(obj: scala.Any): Boolean = {
     obj.isInstanceOf[EnumValueSerializer[_]]
   }
+
+  // --------------------------------------------------------------------------------------------
+  // Serializer configuration snapshotting & compatibility
+  // --------------------------------------------------------------------------------------------
+
+  override def snapshotConfiguration(): EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E] = {
+    new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E](
+      enum.getClass.asInstanceOf[Class[E]])
+  }
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[E#Value] = {
+
+    configSnapshot match {
+      case enumSerializerConfigSnapshot: EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[_] =>
+        val enumClass = enum.getClass.asInstanceOf[Class[E]]
+        if (enumClass.equals(enumSerializerConfigSnapshot.getEnumClass)) {
+          val currentEnumConstants = enumSerializerConfigSnapshot.getEnumClass.getEnumConstants
+
+          for ( i <- 0 to currentEnumConstants.length) {
+            // compatible only if new enum constants are only appended,
+            // and original constants must be in the exact same order
+
+            if (currentEnumConstants(i) != enumSerializerConfigSnapshot.getEnumConstants(i)) {
+              CompatibilityResult.requiresMigration(null)
+            }
+          }
+
+          CompatibilityResult.compatible()
+        } else {
+          CompatibilityResult.requiresMigration(null)
+        }
+
+      case _ => CompatibilityResult.requiresMigration(null)
+    }
+  }
+}
+
+object EnumValueSerializer {
+
+  class ScalaEnumSerializerConfigSnapshot[E <: Enumeration](private var enumClass: Class[E])
+      extends TypeSerializerConfigSnapshot {
+
+    var enumConstants: Array[E] = enumClass.getEnumConstants
+
+    /** This empty nullary constructor is required for deserializing the configuration. */
+    def this() = this(null)
+
+    override def write(out: DataOutputView): Unit = {
+      super.write(out)
+
+      try {
+        val outViewWrapper = new DataOutputViewStream(out)
+        try {
+          InstantiationUtil.serializeObject(outViewWrapper, enumClass)
+          InstantiationUtil.serializeObject(outViewWrapper, enumConstants)
+        } finally if (outViewWrapper != null) outViewWrapper.close()
+      }
+    }
+
+    override def read(in: DataInputView): Unit = {
+      super.read(in)
+
+      try {
+        val inViewWrapper = new DataInputViewStream(in)
+        try
+          try {
+            enumClass = InstantiationUtil.deserializeObject(
+              inViewWrapper, getUserCodeClassLoader)
+
+            enumConstants = InstantiationUtil.deserializeObject(
+              inViewWrapper, getUserCodeClassLoader)
+          } catch {
+            case e: ClassNotFoundException =>
+              throw new IOException("The requested enum class cannot be found in classpath.", e)
+          }
+          finally if (inViewWrapper != null) inViewWrapper.close()
+      }
+    }
+
+    override def getVersion: Int = ScalaEnumSerializerConfigSnapshot.VERSION
+
+    def getEnumClass: Class[E] = enumClass
+
+    def getEnumConstants: Array[E] = enumConstants
+
+    override def equals(obj: scala.Any): Boolean = {
+      if (obj == this) {
+        return true
+      }
+
+      if (obj == null) {
+        return false
+      }
+
+      obj.isInstanceOf[ScalaEnumSerializerConfigSnapshot[E]] &&
+        enumClass.equals(obj.asInstanceOf[ScalaEnumSerializerConfigSnapshot[E]].enumClass) &&
+        enumConstants.sameElements(
+          obj.asInstanceOf[ScalaEnumSerializerConfigSnapshot[E]].enumConstants)
+    }
+
+    override def hashCode(): Int = {
+      enumClass.hashCode() * 31 + enumConstants.toSeq.hashCode()
+    }
+  }
+
+  object ScalaEnumSerializerConfigSnapshot {
+    val VERSION = 1
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
index fa5279e..01ca295 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
@@ -18,8 +18,8 @@
 package org.apache.flink.api.scala.typeutils
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 /**
  * Serializer for cases where no serializer is required but the system still expects one. This
@@ -50,13 +50,19 @@ class NothingSerializer extends TypeSerializer[Any] {
   override def serialize(any: Any, target: DataOutputView): Unit =
     throw new RuntimeException("This must not be used. You encountered a bug.")
 
-
   override def deserialize(source: DataInputView): Any =
     throw new RuntimeException("This must not be used. You encountered a bug.")
 
   override def deserialize(reuse: Any, source: DataInputView): Any =
     throw new RuntimeException("This must not be used. You encountered a bug.")
 
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot =
+    throw new RuntimeException("This must not be used. You encountered a bug.")
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Any] =
+    throw new RuntimeException("This must not be used. You encountered a bug.")
+
   override def equals(obj: Any): Boolean = {
     obj match {
       case nothingSerializer: NothingSerializer => nothingSerializer.canEqual(this)

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index a8b3a56..d2bb098 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -18,8 +18,8 @@
 package org.apache.flink.api.scala.typeutils
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 /**
  * Serializer for [[Option]].
@@ -95,4 +95,52 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
   override def hashCode(): Int = {
     elemSerializer.hashCode()
   }
+
+  // --------------------------------------------------------------------------------------------
+  // Serializer configuration snapshotting & compatibility
+  // --------------------------------------------------------------------------------------------
+
+  override def snapshotConfiguration(): OptionSerializer.OptionSerializerConfigSnapshot = {
+    new OptionSerializer.OptionSerializerConfigSnapshot(elemSerializer.snapshotConfiguration())
+  }
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Option[A]] = {
+    configSnapshot match {
+      case optionSerializerConfigSnapshot: OptionSerializer.OptionSerializerConfigSnapshot =>
+        val compatResult = elemSerializer.ensureCompatibility(
+          optionSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot)
+
+        if (compatResult.requiresMigration()) {
+          if (compatResult.getConvertDeserializer != null) {
+            CompatibilityResult.requiresMigration(
+              new OptionSerializer[A](compatResult.getConvertDeserializer))
+          } else {
+            CompatibilityResult.requiresMigration(null)
+          }
+        } else {
+          CompatibilityResult.compatible()
+        }
+
+      case _ => CompatibilityResult.requiresMigration(null)
+    }
+  }
+}
+
+object OptionSerializer {
+
+  class OptionSerializerConfigSnapshot(
+      private var elemSerializerConfigSnapshot: TypeSerializerConfigSnapshot)
+    extends CompositeTypeSerializerConfigSnapshot(elemSerializerConfigSnapshot) {
+
+    /** This empty nullary constructor is required for deserializing the configuration. */
+    def this() = this(null)
+
+    override def getVersion: Int = OptionSerializerConfigSnapshot.VERSION
+  }
+
+  object OptionSerializerConfigSnapshot {
+    val VERSION = 1
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index d1b9085..1ac46f9 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -20,8 +20,8 @@ package org.apache.flink.api.scala.typeutils
 import java.io.ObjectInputStream
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 import scala.collection.generic.CanBuildFrom
 
@@ -150,4 +150,13 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
   override def canEqual(obj: Any): Boolean = {
     obj.isInstanceOf[TraversableSerializer[_, _]]
   }
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
+    throw new UnsupportedOperationException()
+  }
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
+    throw new UnsupportedOperationException()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index a5ec03a..c864dc7 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -19,11 +19,12 @@ package org.apache.flink.api.scala.typeutils
 
 import org.apache.flink.annotation.Internal
 import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils._
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.KryoSerializerConfigSnapshot
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
-import scala.util.{Success, Try, Failure}
+import scala.util.{Failure, Success, Try}
 
 /**
  * Serializer for [[scala.util.Try]].
@@ -98,4 +99,57 @@ class TrySerializer[A](
   override def hashCode(): Int = {
     31 * elemSerializer.hashCode() + executionConfig.hashCode()
   }
+
+  // --------------------------------------------------------------------------------------------
+  // Serializer configuration snapshotting & compatibility
+  // --------------------------------------------------------------------------------------------
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
+    new TrySerializer.TrySerializerConfigSnapshot(
+        elemSerializer.snapshotConfiguration(),
+        throwableSerializer.snapshotConfiguration())
+  }
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Try[A]] = {
+
+    configSnapshot match {
+      case trySerializerConfigSnapshot: TrySerializer.TrySerializerConfigSnapshot =>
+        val serializerConfigSnapshots =
+          trySerializerConfigSnapshot.getNestedSerializerConfigSnapshots
+
+        val elemCompatRes =
+          elemSerializer.ensureCompatibility(serializerConfigSnapshots(0))
+        val throwableCompatRes =
+          throwableSerializer.ensureCompatibility(serializerConfigSnapshots(1))
+
+        if (elemCompatRes.requiresMigration() || throwableCompatRes.requiresMigration()) {
+          CompatibilityResult.requiresMigration(null)
+        } else {
+          CompatibilityResult.compatible()
+        }
+
+      case _ => CompatibilityResult.requiresMigration(null)
+    }
+  }
+}
+
+object TrySerializer {
+
+  class TrySerializerConfigSnapshot(
+      private var elemSerializerConfigSnapshot: TypeSerializerConfigSnapshot,
+      private var throwableSerializerConfigSnapshot: KryoSerializerConfigSnapshot[Throwable])
+    extends CompositeTypeSerializerConfigSnapshot(
+      elemSerializerConfigSnapshot, throwableSerializerConfigSnapshot) {
+
+    /** This empty nullary constructor is required for deserializing the configuration. */
+    def this() = this(null, null)
+
+    override def getVersion: Int = TrySerializerConfigSnapshot.VERSION
+  }
+
+  object TrySerializerConfigSnapshot {
+    val VERSION = 1
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index fdcd5b8..53fea46 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -21,7 +21,11 @@ package org.apache.flink.migration.streaming.runtime.streamrecord;
 import static java.util.Objects.requireNonNull;
 
 import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -205,6 +209,53 @@ public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Stream
 		}
 	}
 
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public MultiplexingStreamRecordSerializerConfigSnapshot snapshotConfiguration() {
+		return new MultiplexingStreamRecordSerializerConfigSnapshot(typeSerializer.snapshotConfiguration());
+	}
+
+	@Override
+	public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof MultiplexingStreamRecordSerializerConfigSnapshot) {
+			CompatibilityResult<T> compatResult = typeSerializer.ensureCompatibility(
+				((MultiplexingStreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
+
+			if (!compatResult.requiresMigration()) {
+				return CompatibilityResult.compatible();
+			} else if (compatResult.getConvertDeserializer() != null) {
+				return CompatibilityResult.requiresMigration(
+					new MultiplexingStreamRecordSerializer<>(compatResult.getConvertDeserializer()));
+			}
+		}
+
+		return CompatibilityResult.requiresMigration(null);
+	}
+
+	/**
+	 * Configuration snapshot specific to the {@link MultiplexingStreamRecordSerializer}.
+	 */
+	public static final class MultiplexingStreamRecordSerializerConfigSnapshot
+			extends CompositeTypeSerializerConfigSnapshot {
+
+		private static final int VERSION = 1;
+
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		public MultiplexingStreamRecordSerializerConfigSnapshot() {}
+
+		public MultiplexingStreamRecordSerializerConfigSnapshot(TypeSerializerConfigSnapshot typeSerializerConfigSnapshot) {
+			super(typeSerializerConfigSnapshot);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
index 2c8dc4a..2a87f4e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -20,7 +20,10 @@ package org.apache.flink.migration.streaming.runtime.streamrecord;
 
 import java.io.IOException;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -144,4 +147,50 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 	public int hashCode() {
 		return typeSerializer.hashCode();
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public StreamRecordSerializerConfigSnapshot snapshotConfiguration() {
+		return new StreamRecordSerializerConfigSnapshot(typeSerializer.snapshotConfiguration());
+	}
+
+	@Override
+	public CompatibilityResult<StreamRecord<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof StreamRecordSerializerConfigSnapshot) {
+			CompatibilityResult<T> compatResult = typeSerializer.ensureCompatibility(
+				((StreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
+
+			if (!compatResult.requiresMigration()) {
+				return CompatibilityResult.requiresMigration(null);
+			} else if (compatResult.getConvertDeserializer() != null) {
+				return CompatibilityResult.requiresMigration(
+					new StreamRecordSerializer<>(compatResult.getConvertDeserializer()));
+			}
+		}
+
+		return CompatibilityResult.requiresMigration(null);
+	}
+
+	/**
+	 * Configuration snapshot specific to the {@link StreamRecordSerializer}.
+	 */
+	public static final class StreamRecordSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
+
+		private static final int VERSION = 1;
+
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		public StreamRecordSerializerConfigSnapshot() {}
+
+		public StreamRecordSerializerConfigSnapshot(TypeSerializerConfigSnapshot typeSerializerConfigSnapshot) {
+			super(typeSerializerConfigSnapshot);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 13a8a24..f0c3dc2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -29,7 +29,9 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -547,6 +549,16 @@ public class CoGroupedStreams<T1, T2> {
 		public boolean canEqual(Object obj) {
 			return obj instanceof UnionSerializer;
 		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+		}
+
+		@Override
+		public CompatibilityResult<TaggedUnion<T1, T2>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
index 7b6ba8d..1455712 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
@@ -19,7 +19,9 @@ package org.apache.flink.streaming.api.operators;
 
 import java.io.IOException;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -184,5 +186,15 @@ public class InternalTimer<K, N> implements Comparable<InternalTimer<K, N>> {
 		public int hashCode() {
 			return getClass().hashCode();
 		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+		}
+
+		@Override
+		public CompatibilityResult<InternalTimer<K, N>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
index b1e06f5..cf5c74c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.windowing.windows;
 import java.io.IOException;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -61,7 +62,7 @@ public class GlobalWindow extends Window {
 	/**
 	 * A {@link TypeSerializer} for {@link GlobalWindow}.
 	 */
-	public static class Serializer extends TypeSerializer<GlobalWindow> {
+	public static class Serializer extends TypeSerializerSingleton<GlobalWindow> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -70,11 +71,6 @@ public class GlobalWindow extends Window {
 		}
 
 		@Override
-		public TypeSerializer<GlobalWindow> duplicate() {
-			return this;
-		}
-
-		@Override
 		public GlobalWindow createInstance() {
 			return GlobalWindow.INSTANCE;
 		}
@@ -119,18 +115,8 @@ public class GlobalWindow extends Window {
 		}
 
 		@Override
-		public boolean equals(Object obj) {
-			return obj instanceof Serializer;
-		}
-
-		@Override
 		public boolean canEqual(Object obj) {
 			return obj instanceof Serializer;
 		}
-
-		@Override
-		public int hashCode() {
-			return 0;
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 6d896cb..a7ea244 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -26,7 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -130,7 +130,7 @@ public class TimeWindow extends Window {
 	/**
 	 * The serializer used to write the TimeWindow type.
 	 */
-	public static class Serializer extends TypeSerializer<TimeWindow> {
+	public static class Serializer extends TypeSerializerSingleton<TimeWindow> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -139,11 +139,6 @@ public class TimeWindow extends Window {
 		}
 
 		@Override
-		public TypeSerializer<TimeWindow> duplicate() {
-			return this;
-		}
-
-		@Override
 		public TimeWindow createInstance() {
 			return null;
 		}
@@ -188,19 +183,9 @@ public class TimeWindow extends Window {
 		}
 
 		@Override
-		public boolean equals(Object obj) {
-			return obj instanceof Serializer;
-		}
-
-		@Override
 		public boolean canEqual(Object obj) {
 			return obj instanceof Serializer;
 		}
-
-		@Override
-		public int hashCode() {
-			return 0;
-		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index 7fe088a..5c52fa6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -22,7 +22,10 @@ import static java.util.Objects.requireNonNull;
 
 import java.io.IOException;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -262,4 +265,54 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 	public int hashCode() {
 		return typeSerializer.hashCode();
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	//
+	// This serializer may be used by Flink internal operators that need to checkpoint
+	// buffered records. Therefore, it may be part of managed state and need to implement
+	// the configuration snapshot and compatibility methods.
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public StreamElementSerializerConfigSnapshot snapshotConfiguration() {
+		return new StreamElementSerializerConfigSnapshot(typeSerializer.snapshotConfiguration());
+	}
+
+	@Override
+	public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) {
+			CompatibilityResult<T> compatResult = typeSerializer.ensureCompatibility(
+				((StreamElementSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
+
+			if (!compatResult.requiresMigration()) {
+				return CompatibilityResult.compatible();
+			} else if (compatResult.getConvertDeserializer() != null) {
+				return CompatibilityResult.requiresMigration(
+					new StreamElementSerializer<>(compatResult.getConvertDeserializer()));
+			}
+		}
+
+		return CompatibilityResult.requiresMigration(null);
+	}
+
+	/**
+	 * Configuration snapshot specific to the {@link StreamElementSerializer}.
+	 */
+	public static final class StreamElementSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
+
+		private static final int VERSION = 1;
+
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		public StreamElementSerializerConfigSnapshot() {}
+
+		public StreamElementSerializerConfigSnapshot(TypeSerializerConfigSnapshot typeSerializerConfigSnapshot) {
+			super(typeSerializerConfigSnapshot);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
index a24a3a8..2693bc1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
@@ -228,6 +228,5 @@ public class CheckpointingCustomKvStateProgram {
 		public boolean canEqual(Object obj) {
 			return obj instanceof CustomIntSerializer;
 		}
-
 	}
 }


Mime
View raw message