flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [1/3] flink git commit: [FLINK-6565] Fail memory-backed state restores with meaningful message if previous serializer is unavailable
Date Sat, 13 May 2017 09:03:41 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7173774d0 -> 947c44e86


[FLINK-6565] Fail memory-backed state restores with meaningful message if previous serializer
is unavailable

This closes #3882.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c594af09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c594af09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c594af09

Branch: refs/heads/master
Commit: c594af09767e2ef1e74dd8db187985460761b724
Parents: 7173774
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Fri May 12 19:11:25 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Sat May 13 14:37:49 2017 +0800

----------------------------------------------------------------------
 .../state/DefaultOperatorStateBackend.java      |  17 +++
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   8 ++
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   6 +
 .../state/heap/HeapKeyedStateBackend.java       |  17 +++
 .../runtime/state/MemoryStateBackendTest.java   | 135 +++++++++++++++++++
 .../runtime/state/OperatorStateBackendTest.java |  70 +++++++++-
 .../runtime/state/StateBackendTestBase.java     |   2 +-
 7 files changed, 251 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index ab0c1f0..1d3af72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -293,6 +294,22 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend
{
 
 				// Recreate all PartitionableListStates from the meta info
 				for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : restoredMetaInfoSnapshots)
{
+
+					if (restoredMetaInfo.getPartitionStateSerializer() == null ||
+							restoredMetaInfo.getPartitionStateSerializer()
+								instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) {
+
+						// must fail now if the previous serializer cannot be restored because there is no
serializer
+						// capable of reading previous state
+						// TODO when eager state registration is in place, we can try to get a convert deserializer
+						// TODO from the newly registered serializer instead of simply failing here
+
+						throw new IOException("Unable to restore operator state [" + restoredMetaInfo.getName()
+ "]." +
+							" The previous serializer of the operator state must be present; the serializer could"
+
+							" have been removed from the classpath, or its implementation have changed and could"
+
+							" not be loaded. This is a temporary restriction that will be fixed in future versions.");
+					}
+
 					PartitionableListState<?> listState = registeredStates.get(restoredMetaInfo.getName());
 
 					if (null == listState) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
index 83aa335..ac81e86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
@@ -28,6 +28,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -37,6 +39,8 @@ import java.io.IOException;
  */
 public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
 
+	private static final Logger LOG = LoggerFactory.getLogger(KeyedBackendStateMetaInfoSnapshotReaderWriters.class);
+
 	// -------------------------------------------------------------------------------
 	//  Writers
 	//   - v1: Flink 1.2.x
@@ -230,6 +234,8 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
 				namespaceSerializerProxy.read(inViewWrapper);
 				metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer());
 			} catch (IOException e) {
+				LOG.warn("Deserialization of previous namespace serializer errored; setting serializer
to null. ", e);
+
 				metaInfo.setNamespaceSerializer(null);
 			}
 
@@ -241,6 +247,8 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
 				stateSerializerProxy.read(inViewWrapper);
 				metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer());
 			} catch (IOException e) {
+				LOG.warn("Deserialization of previous state serializer errored; setting serializer to
null. ", e);
+
 				metaInfo.setStateSerializer(null);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
index 9ab106b..4f151c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -30,6 +30,8 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -39,6 +41,8 @@ import java.io.IOException;
  */
 public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
 
+	private static final Logger LOG = LoggerFactory.getLogger(OperatorBackendStateMetaInfoSnapshotReaderWriters.class);
+
 	// -------------------------------------------------------------------------------
 	//  Writers
 	//   - v1: Flink 1.2.x
@@ -219,6 +223,8 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
 				partitionStateSerializerProxy.read(inViewWrapper);
 				stateMetaInfo.setPartitionStateSerializer(partitionStateSerializerProxy.getTypeSerializer());
 			} catch (IOException e) {
+				LOG.warn("Deserialization of previous serializer errored; setting serializer to null.
", e);
+
 				stateMetaInfo.setPartitionStateSerializer(null);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 866ed28..bc314df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -389,6 +390,22 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 				for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos)
{
 
+					if (restoredMetaInfo.getStateSerializer() == null ||
+							restoredMetaInfo.getStateSerializer()
+								instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) {
+
+						// must fail now if the previous serializer cannot be restored because there is no
serializer
+						// capable of reading previous state
+						// TODO when eager state registration is in place, we can try to get a convert deserializer
+						// TODO from the newly registered serializer instead of simply failing here
+
+						throw new IOException("Unable to restore keyed state [" + restoredMetaInfo.getName()
+ "]." +
+							" For memory-backed keyed state, the previous serializer of the keyed state must be"
+
+							" present; the serializer could have been removed from the classpath, or its implementation"
+
+							" have changed and could not be loaded. This is a temporary restriction that will
be fixed" +
+							" in future versions.");
+					}
+
 					StateTable<K, ?, ?> stateTable = stateTables.get(restoredMetaInfo.getName());
 
 					//important: only create a new table we did not already create it previously

http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 48d56e2..fee97f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -20,27 +20,48 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.FutureUtil;
+import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.concurrent.RunnableFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}.
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
 public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBackend>
{
 
 	@Override
@@ -198,6 +219,120 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 		}
 	}
 
+	/**
+	 * Verifies that the operator state backend fails with appropriate error and message if
+	 * previous serializer can not be restored.
+	 */
+	@Test
+	public void testOperatorStateRestoreFailsIfSerializerDeserializationFails() throws Exception
{
+		AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
+
+		Environment env = mock(Environment.class);
+		when(env.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader());
+
+		OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(env,
"test-op-name");
+
+		// write some state
+		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1",
new JavaSerializer<>());
+		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2",
new JavaSerializer<>());
+		ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3",
new JavaSerializer<>());
+		ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1);
+		ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2);
+		ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
+
+		listState1.add(42);
+		listState1.add(4711);
+
+		listState2.add(7);
+		listState2.add(13);
+		listState2.add(23);
+
+		listState3.add(17);
+		listState3.add(18);
+		listState3.add(19);
+		listState3.add(20);
+
+		CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(),
"testOperator");
+		RunnableFuture<OperatorStateHandle> runnableFuture =
+			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+		OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture);
+
+		try {
+
+			operatorStateBackend.close();
+			operatorStateBackend.dispose();
+
+			operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
+				env,
+				"testOperator");
+
+			// mock failure when deserializing serializer
+			TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
+			doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+			PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+			operatorStateBackend.restore(Collections.singletonList(stateHandle));
+
+			fail("The operator state restore should have failed if the previous state serializer could
not be loaded.");
+		} catch (IOException expected) {
+			Assert.assertTrue(expected.getMessage().contains("Unable to restore operator state"));
+		} finally {
+			stateHandle.discardState();
+		}
+	}
+
+	/**
+	 * Verifies that memory-backed keyed state backend fails with appropriate error and message
if
+	 * previous serializer can not be restored.
+	 */
+	@Test
+	public void testKeyedStateRestoreFailsIfSerializerDeserializationFails() throws Exception
{
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class,
null);
+		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		HeapKeyedStateBackend<Integer> heapBackend = (HeapKeyedStateBackend<Integer>)
backend;
+
+		assertEquals(0, heapBackend.numStateEntries());
+
+		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE,
kvId);
+
+		// write some state
+		backend.setCurrentKey(0);
+		state.update("hello");
+		state.update("ciao");
+
+		KeyedStateHandle snapshot = runSnapshot(((HeapKeyedStateBackend<Integer>) backend).snapshot(
+			682375462378L,
+			2,
+			streamFactory,
+			CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ========== restore snapshot ==========
+
+		Environment env = mock(Environment.class);
+		when(env.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader());
+
+		// mock failure when deserializing serializer
+		TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
+		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+		PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+		try {
+			restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+
+			fail("The keyed state restore should have failed if the previous state serializer could
not be loaded.");
+		} catch (IOException expected) {
+			Assert.assertTrue(expected.getMessage().contains("Unable to restore keyed state"));
+		}
+	}
+
 	@Ignore
 	@Test
 	public void testConcurrentMapIfQueryable() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 85b9eaf..af5f0b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -22,7 +22,9 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
@@ -32,6 +34,10 @@ import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.util.FutureUtil;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.File;
 import java.io.IOException;
@@ -52,9 +58,13 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(OperatorBackendStateMetaInfoSnapshotReaderWriters.class)
 public class OperatorStateBackendTest {
 
 	private final ClassLoader classLoader = getClass().getClassLoader();
@@ -290,7 +300,7 @@ public class OperatorStateBackendTest {
 
 	@Test
 	public void testSnapshotRestoreAsync() throws Exception {
-		DefaultOperatorStateBackend operatorStateBackend =
+		OperatorStateBackend operatorStateBackend =
 				new DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), new
ExecutionConfig(), true);
 
 		ListStateDescriptor<MutableType> stateDescriptor1 =
@@ -362,8 +372,7 @@ public class OperatorStateBackendTest {
 
 			AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
 
-			//TODO this is temporarily casted to test already functionality that we do not yet expose
through public API
-			operatorStateBackend = (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend(
+			operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
 					createMockEnvironment(),
 					"testOperator");
 
@@ -494,6 +503,61 @@ public class OperatorStateBackendTest {
 		}
 	}
 
+	@Test
+	public void testRestoreFailsIfSerializerDeserializationFails() throws Exception {
+		AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
+
+		OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(createMockEnvironment(),
"test-op-name");
+
+		// write some state
+		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1",
new JavaSerializer<>());
+		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2",
new JavaSerializer<>());
+		ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3",
new JavaSerializer<>());
+		ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1);
+		ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2);
+		ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
+
+		listState1.add(42);
+		listState1.add(4711);
+
+		listState2.add(7);
+		listState2.add(13);
+		listState2.add(23);
+
+		listState3.add(17);
+		listState3.add(18);
+		listState3.add(19);
+		listState3.add(20);
+
+		CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(),
"testOperator");
+		RunnableFuture<OperatorStateHandle> runnableFuture =
+			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+		OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture);
+
+		try {
+
+			operatorStateBackend.close();
+			operatorStateBackend.dispose();
+
+			operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
+				createMockEnvironment(),
+				"testOperator");
+
+			// mock failure when deserializing serializer
+			TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
+			doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+			PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+			operatorStateBackend.restore(Collections.singletonList(stateHandle));
+
+			fail("The operator state restore should have failed if the previous state serializer could
not be loaded.");
+		} catch (IOException expected) {
+			Assert.assertTrue(expected.getMessage().contains("Unable to restore operator state"));
+		} finally {
+			stateHandle.discardState();
+		}
+	}
+
 	static final class MutableType implements Serializable {
 
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 96025fe..658ccde 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -2508,7 +2508,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 		}
 	}
 
-	private KeyedStateHandle runSnapshot(RunnableFuture<KeyedStateHandle> snapshotRunnableFuture)
throws Exception {
+	protected KeyedStateHandle runSnapshot(RunnableFuture<KeyedStateHandle> snapshotRunnableFuture)
throws Exception {
 		if(!snapshotRunnableFuture.isDone()) {
 			Thread runner = new Thread(snapshotRunnableFuture);
 			runner.start();


Mime
View raw message