flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/4] flink git commit: [hotfix] Remove some raw type usage in RocksDBKeyedStateBackend
Date Fri, 19 May 2017 06:11:07 GMT
[hotfix] Remove some raw type usage in RocksDBKeyedStateBackend

Introduce more generic parameters


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2af1a9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2af1a9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2af1a9f

Branch: refs/heads/master
Commit: f2af1a9f916bd9b941a48a1da577d19fc07badde
Parents: ef6f7b6
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu May 18 17:05:50 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu May 18 23:19:32 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 69 +++++++++-----------
 .../state/KeyedBackendSerializationProxy.java   | 10 +--
 .../state/heap/HeapKeyedStateBackend.java       | 12 ++--
 .../runtime/state/SerializationProxiesTest.java | 12 ++--
 4 files changed, 49 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2af1a9f/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ddc7e17..d0f73bf 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -123,8 +123,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 	private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
 
-	private final JobID jobId;
-
 	private final String operatorIdentifier;
 
 	/** The column family options from the options factory */
@@ -165,7 +163,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 	 * TODO this map can be removed when eager-state registration is in place.
 	 * TODO we currently need this cached to check state migration strategies when new serializers
are registered.
 	 */
-	private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot> restoredKvStateMetaInfos;
+	private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
 
 	/** Number of bytes required to prefix the key groups. */
 	private final int keyGroupPrefixBytes;
@@ -198,7 +196,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange,
executionConfig);
 
-		this.jobId = Preconditions.checkNotNull(jobId);
 		this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
 
 		this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
@@ -314,8 +311,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 			final long checkpointTimestamp,
 			final CheckpointStreamFactory checkpointStreamFactory) throws Exception {
 
-		final RocksDBIncrementalSnapshotOperation snapshotOperation =
-			new RocksDBIncrementalSnapshotOperation(
+		final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
+			new RocksDBIncrementalSnapshotOperation<>(
 				this,
 				checkpointStreamFactory,
 				checkpointId,
@@ -365,7 +362,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 		long startTime = System.currentTimeMillis();
 
-		final RocksDBFullSnapshotOperation snapshotOperation = new RocksDBFullSnapshotOperation(this,
streamFactory);
+		final RocksDBFullSnapshotOperation<K> snapshotOperation = new RocksDBFullSnapshotOperation<>(this,
streamFactory);
 		// hold the db lock while operation on the db to guard us against async db disposal
 		synchronized (asyncSnapshotLock) {
 
@@ -440,12 +437,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 	/**
 	 * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend.
 	 */
-	static final class RocksDBFullSnapshotOperation {
+	static final class RocksDBFullSnapshotOperation<K> {
 
 		static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
 		static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
 
-		private final RocksDBKeyedStateBackend<?> stateBackend;
+		private final RocksDBKeyedStateBackend<K> stateBackend;
 		private final KeyGroupRangeOffsets keyGroupRangeOffsets;
 		private final CheckpointStreamFactory checkpointStreamFactory;
 
@@ -461,7 +458,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		private KeyGroupsStateHandle snapshotResultStateHandle;
 
 		RocksDBFullSnapshotOperation(
-				RocksDBKeyedStateBackend<?> stateBackend,
+				RocksDBKeyedStateBackend<K> stateBackend,
 				CheckpointStreamFactory checkpointStreamFactory) {
 
 			this.stateBackend = stateBackend;
@@ -601,8 +598,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 				++kvStateId;
 			}
 
-			KeyedBackendSerializationProxy serializationProxy =
-					new KeyedBackendSerializationProxy(stateBackend.getKeySerializer(), metaInfoSnapshots);
+			KeyedBackendSerializationProxy<K> serializationProxy =
+					new KeyedBackendSerializationProxy<>(stateBackend.getKeySerializer(), metaInfoSnapshots);
 
 			serializationProxy.write(outputView);
 		}
@@ -710,10 +707,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		}
 	}
 
-	private static final class RocksDBIncrementalSnapshotOperation {
+	private static final class RocksDBIncrementalSnapshotOperation<K> {
 
 		/** The backend which we snapshot */
-		private final RocksDBKeyedStateBackend<?> stateBackend;
+		private final RocksDBKeyedStateBackend<K> stateBackend;
 
 		/** Stream factory that creates the outpus streams to DFS */
 		private final CheckpointStreamFactory checkpointStreamFactory;
@@ -748,7 +745,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		private StreamStateHandle metaStateHandle = null;
 
 		private RocksDBIncrementalSnapshotOperation(
-				RocksDBKeyedStateBackend<?> stateBackend,
+				RocksDBKeyedStateBackend<K> stateBackend,
 				CheckpointStreamFactory checkpointStreamFactory,
 				long checkpointId,
 				long checkpointTimestamp) {
@@ -810,8 +807,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
 				closeableRegistry.registerClosable(outputStream);
 
-				KeyedBackendSerializationProxy serializationProxy =
-					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfoSnapshots);
+				KeyedBackendSerializationProxy<K> serializationProxy =
+					new KeyedBackendSerializationProxy<>(stateBackend.keySerializer, stateMetaInfoSnapshots);
 				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
 
 				serializationProxy.write(out);
@@ -964,10 +961,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 				LOG.info("Converting RocksDB state from old savepoint.");
 				restoreOldSavepointKeyedState(restoreState);
 			} else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) {
-				RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation(this);
+				RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
 				restoreOperation.restore(restoreState);
 			} else {
-				RocksDBFullRestoreOperation restoreOperation = new RocksDBFullRestoreOperation(this);
+				RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
 				restoreOperation.doRestore(restoreState);
 			}
 		} catch (Exception ex) {
@@ -1037,9 +1034,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 	/**
 	 * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot.
 	 */
-	static final class RocksDBFullRestoreOperation {
+	static final class RocksDBFullRestoreOperation<K> {
 
-		private final RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend;
+		private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend;
 
 		/** Current key-groups state handle from which we restore key-groups */
 		private KeyGroupsStateHandle currentKeyGroupsStateHandle;
@@ -1055,7 +1052,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		 *
 		 * @param rocksDBKeyedStateBackend the state backend into which we restore
 		 */
-		public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend)
{
+		public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend)
{
 			this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend);
 		}
 
@@ -1116,11 +1113,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		 * @throws ClassNotFoundException
 		 * @throws RocksDBException
 		 */
-		@SuppressWarnings("unchecked")
-		private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException
{
+		private void restoreKVStateMetaData() throws IOException, RocksDBException {
 
-			KeyedBackendSerializationProxy serializationProxy =
-					new KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader);
+			KeyedBackendSerializationProxy<K> serializationProxy =
+					new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
 
 			serializationProxy.read(currentStateHandleInView);
 
@@ -1130,7 +1126,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 					serializationProxy.getKeySerializer(),
 					TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
 					serializationProxy.getKeySerializerConfigSnapshot(),
-					(TypeSerializer) rocksDBKeyedStateBackend.keySerializer)
+					rocksDBKeyedStateBackend.keySerializer)
 				.isRequiresMigration()) {
 
 				// TODO replace with state migration; note that key hash codes need to remain the same
after migration
@@ -1221,15 +1217,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		}
 	}
 
-	private static class RocksDBIncrementalRestoreOperation {
+	private static class RocksDBIncrementalRestoreOperation<T> {
 
-		private final RocksDBKeyedStateBackend<?> stateBackend;
+		private final RocksDBKeyedStateBackend<T> stateBackend;
 
-		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend)
{
+		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend)
{
 			this.stateBackend = stateBackend;
 		}
 
-		@SuppressWarnings("unchecked")
 		private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData(
 				StreamStateHandle metaStateHandle) throws Exception {
 
@@ -1239,8 +1234,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 				inputStream = metaStateHandle.openInputStream();
 				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
 
-				KeyedBackendSerializationProxy serializationProxy =
-					new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+				KeyedBackendSerializationProxy<T> serializationProxy =
+					new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
 				DataInputView in = new DataInputViewStreamWrapper(inputStream);
 				serializationProxy.read(in);
 
@@ -1250,7 +1245,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 						serializationProxy.getKeySerializer(),
 						TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
 						serializationProxy.getKeySerializerConfigSnapshot(),
-						(TypeSerializer) stateBackend.keySerializer)
+						stateBackend.keySerializer)
 					.isRequiresMigration()) {
 
 					// TODO replace with state migration; note that key hash codes need to remain the same
after migration
@@ -1536,7 +1531,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 			// TODO with eager registration in place, these checks should be moved to restore()
 
 			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
-				restoredKvStateMetaInfos.get(descriptor.getName());
+				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName());
 
 			Preconditions.checkState(
 				newMetaInfo.getName().equals(restoredMetaInfo.getName()),
@@ -1556,7 +1551,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 			// check compatibility results to determine if state migration is required
 
-			CompatibilityResult<N> namespaceCompatibility = StateMigrationUtil.resolveCompatibilityResult(
+			CompatibilityResult<?> namespaceCompatibility = StateMigrationUtil.resolveCompatibilityResult(
 					restoredMetaInfo.getNamespaceSerializer(),
 					MigrationNamespaceSerializerProxy.class,
 					restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
@@ -1929,7 +1924,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 					new InstantiationUtil.ClassLoaderObjectInputStream(
 							new DataInputViewStream(inputView), userCodeClassLoader);
 
-			StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject();
+			StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) ooIn.readObject();
 
 			columnFamilyMapping.put(mappingByte, stateDescriptor);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2af1a9f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 94fb9f1..f265f78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -39,11 +39,11 @@ import java.util.List;
  * Serialization proxy for all meta data in keyed state backends. In the future we might
also requiresMigration the actual state
  * serialization logic here.
  */
-public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
+public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritable
{
 
 	public static final int VERSION = 3;
 
-	private TypeSerializer<?> keySerializer;
+	private TypeSerializer<K> keySerializer;
 	private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
 
 	private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
@@ -55,7 +55,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 	}
 
 	public KeyedBackendSerializationProxy(
-			TypeSerializer<?> keySerializer,
+			TypeSerializer<K> keySerializer,
 			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots)
{
 
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -70,7 +70,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 		return stateMetaInfoSnapshots;
 	}
 
-	public TypeSerializer<?> getKeySerializer() {
+	public TypeSerializer<K> getKeySerializer() {
 		return keySerializer;
 	}
 
@@ -122,7 +122,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 	public void read(DataInputView in) throws IOException {
 		super.read(in);
 
-		final TypeSerializerSerializationProxy<?> keySerializerProxy =
+		final TypeSerializerSerializationProxy<K> keySerializerProxy =
 			new TypeSerializerSerializationProxy<>(userCodeClassLoader);
 
 		// only starting from version 3, we have the key serializer and its config snapshot written

http://git-wip-us.apache.org/repos/asf/flink/blob/f2af1a9f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 6eb314b..3e5645b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -272,8 +272,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 			}
 		}
 
-		final KeyedBackendSerializationProxy serializationProxy =
-				new KeyedBackendSerializationProxy(keySerializer, metaInfoSnapshots);
+		final KeyedBackendSerializationProxy<K> serializationProxy =
+				new KeyedBackendSerializationProxy<>(keySerializer, metaInfoSnapshots);
 
 		//--------------------------------------------------- this becomes the end of sync part
 
@@ -383,8 +383,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 			try {
 				DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
 
-				KeyedBackendSerializationProxy serializationProxy =
-						new KeyedBackendSerializationProxy(userCodeClassLoader);
+				KeyedBackendSerializationProxy<K> serializationProxy =
+						new KeyedBackendSerializationProxy<>(userCodeClassLoader);
 
 				serializationProxy.read(inView);
 
@@ -395,7 +395,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 						serializationProxy.getKeySerializer(),
 						TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
 						serializationProxy.getKeySerializerConfigSnapshot(),
-						(TypeSerializer) keySerializer)
+						keySerializer)
 						.isRequiresMigration()) {
 
 						// TODO replace with state migration; note that key hash codes need to remain the same
after migration
@@ -405,7 +405,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 					keySerializerRestored = true;
 				}
-				
+
 				List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos
=
 						serializationProxy.getStateMetaInfoSnapshots();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2af1a9f/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 8bbbd5f..3d5b210 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -66,8 +66,8 @@ public class SerializationProxiesTest {
 		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
 			StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
 
-		KeyedBackendSerializationProxy serializationProxy =
-				new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
+		KeyedBackendSerializationProxy<?> serializationProxy =
+				new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
 
 		byte[] serialized;
 		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -76,7 +76,7 @@ public class SerializationProxiesTest {
 		}
 
 		serializationProxy =
-				new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());
+				new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
 
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
 			serializationProxy.read(new DataInputViewStreamWrapper(in));
@@ -103,8 +103,8 @@ public class SerializationProxiesTest {
 		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
 			StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
 
-		KeyedBackendSerializationProxy serializationProxy =
-			new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
+		KeyedBackendSerializationProxy<?> serializationProxy =
+			new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
 
 		byte[] serialized;
 		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -113,7 +113,7 @@ public class SerializationProxiesTest {
 		}
 
 		serializationProxy =
-			new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());
+			new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
 
 		// mock failure when deserializing serializers
 		TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);


Mime
View raw message