flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [1/8] flink git commit: [FLINK-6714] [runtime] Use user classloader for operator state copying on snapshots
Date Fri, 26 May 2017 08:42:00 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 401e99759 -> 7fe4df336


[FLINK-6714] [runtime] Use user classloader for operator state copying on snapshots

This closes #3987.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0454234
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0454234
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0454234

Branch: refs/heads/release-1.3
Commit: e0454234748f838799f905e9271253dfa808a797
Parents: 401e997
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Thu May 25 16:54:08 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Fri May 26 16:31:19 2017 +0800

----------------------------------------------------------------------
 .../state/DefaultOperatorStateBackend.java      | 21 ++++++----
 .../runtime/state/OperatorStateBackendTest.java | 42 ++++++++++++++++++++
 2 files changed, 55 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0454234/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index eec2e93..0f96dac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -185,13 +185,18 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend
{
 				new HashMap<>(registeredStates.size());
 
 		// eagerly create deep copies of the list states in the sync phase, so that we can use
them in the async writing
-		for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet())
{
-
-			PartitionableListState<?> listState = entry.getValue();
-			if (null != listState) {
-				listState = listState.deepCopy();
+		ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
+		Thread.currentThread().setContextClassLoader(userClassloader);
+		try {
+			for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet())
{
+				PartitionableListState<?> listState = entry.getValue();
+				if (null != listState) {
+					listState = listState.deepCopy();
+				}
+				registeredStatesDeepCopies.put(entry.getKey(), listState);
 			}
-			registeredStatesDeepCopies.put(entry.getKey(), listState);
+		} finally {
+			Thread.currentThread().setContextClassLoader(snapshotClassLoader);
 		}
 
 		// implementation of the async IO operation, based on FutureTask
@@ -258,8 +263,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend
{
 			task.run();
 		}
 
-		LOG.info("DefaultOperatorStateBackend snapshot (" + streamFactory + ", synchronous part)
in thread " +
-				Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + "
ms.");
+		LOG.info("DefaultOperatorStateBackend snapshot ({}, synchronous part) in thread {} took
{} ms.",
+				streamFactory, Thread.currentThread(), (System.currentTimeMillis() - syncStartTime));
 
 		return task;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0454234/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 31b75c7..d44f6c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -35,6 +36,9 @@ import org.apache.flink.util.FutureUtil;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -61,6 +65,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
@@ -208,6 +213,43 @@ public class OperatorStateBackendTest {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testCorrectClassLoaderUsedOnSnapshot() throws Exception {
+
+		AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
+
+		final Environment env = createMockEnvironment();
+		OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(env,
"test-op-name");
+
+		// mock serializer which tests that on copy, the correct classloader is used as the context
classloader
+		TypeSerializer<Integer> mockSerializer = mock(TypeSerializer.class);
+		when(mockSerializer.copy(Matchers.any(Integer.class))).thenAnswer(new Answer<Object>()
{
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				Assert.assertEquals(env.getUserClassLoader(), Thread.currentThread().getContextClassLoader());
+				return null;
+			}
+		});
+		// return actual serializers / config snapshots so that the snapshot proceeds properly
+		when(mockSerializer.duplicate()).thenReturn(IntSerializer.INSTANCE);
+		when(mockSerializer.snapshotConfiguration()).thenReturn(IntSerializer.INSTANCE.snapshotConfiguration());
+
+		// write some state
+		ListStateDescriptor<Integer> stateDescriptor = new ListStateDescriptor<>("test",
mockSerializer);
+		ListState<Integer> listState = operatorStateBackend.getListState(stateDescriptor);
+
+		listState.add(42);
+
+		CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(),
"testOperator");
+		RunnableFuture<OperatorStateHandle> runnableFuture =
+			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+		FutureUtil.runIfNotDoneAndGet(runnableFuture);
+
+		// make sure that the method of interest is called
+		verify(mockSerializer).copy(Matchers.any(Integer.class));
+	}
+
 	@Test
 	public void testSnapshotEmpty() throws Exception {
 		final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);


Mime
View raw message