flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/3] flink git commit: [FLINK-5995] [checkpoints] Fix serializer initialization for Operator State
Date Mon, 20 Mar 2017 14:41:03 GMT
[FLINK-5995] [checkpoints] Fix serializer initialization for Operator State

This closes #3503


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/614abd29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/614abd29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/614abd29

Branch: refs/heads/master
Commit: 614abd29ee2a33be5cd98a6ce55abd1b605fc296
Parents: 486f724
Author: 金竹 <jincheng.sunjc@alibaba-inc.com>
Authored: Fri Mar 10 09:29:57 2017 +0800
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Mar 20 13:20:03 2017 +0100

----------------------------------------------------------------------
 .../runtime/state/AbstractStateBackend.java     |  2 +-
 .../state/DefaultOperatorStateBackend.java      | 12 ++++++++--
 .../runtime/state/OperatorStateBackendTest.java | 25 ++++++++++++++++++++
 3 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/614abd29/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 2cf20a1..74025bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -95,7 +95,7 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
 			Environment env,
 			String operatorIdentifier) throws Exception {
 
-		return new DefaultOperatorStateBackend(env.getUserClassLoader());
+		return new DefaultOperatorStateBackend(env.getUserClassLoader(), env.getExecutionConfig());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/614abd29/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 2497a00..71cccae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -56,15 +57,20 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend
{
 	private final CloseableRegistry closeStreamOnCancelRegistry;
 	private final JavaSerializer<Serializable> javaSerializer;
 	private final ClassLoader userClassloader;
+	private final ExecutionConfig executionConfig;
 
-	public DefaultOperatorStateBackend(ClassLoader userClassLoader) throws IOException {
-
+	public DefaultOperatorStateBackend(ClassLoader userClassLoader, ExecutionConfig executionConfig)
throws IOException {
 		this.closeStreamOnCancelRegistry = new CloseableRegistry();
 		this.userClassloader = Preconditions.checkNotNull(userClassLoader);
+		this.executionConfig = executionConfig;
 		this.javaSerializer = new JavaSerializer<>();
 		this.registeredStates = new HashMap<>();
 	}
 
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+
 	@Override
 	public Set<String> getRegisteredStateNames() {
 		return registeredStates.keySet();
@@ -106,6 +112,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend
{
 
 		Preconditions.checkNotNull(stateDescriptor);
 
+		stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
+
 		String name = Preconditions.checkNotNull(stateDescriptor.getName());
 		TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/614abd29/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index d883d6e..157d5ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -28,6 +29,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.Serializable;
+import java.io.File;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.RunnableFuture;
@@ -45,6 +47,8 @@ public class OperatorStateBackendTest {
 
 	static Environment createMockEnvironment() {
 		Environment env = mock(Environment.class);
+		ExecutionConfig config = mock(ExecutionConfig.class);
+		when(env.getExecutionConfig()).thenReturn(config);
 		when(env.getUserClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
 		return env;
 	}
@@ -64,6 +68,27 @@ public class OperatorStateBackendTest {
 	}
 
 	@Test
+	public void testRegisterStatesWithoutTypeSerializer() throws Exception {
+		DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+		ListStateDescriptor<File> stateDescriptor = new ListStateDescriptor<>("test",
File.class);
+		ListStateDescriptor<String> stateDescriptor2 = new ListStateDescriptor<>("test2",
String.class);
+		ListState<File> listState = operatorStateBackend.getOperatorState(stateDescriptor);
+		assertNotNull(listState);
+		ListState<String> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2);
+		assertNotNull(listState2);
+		assertEquals(2, operatorStateBackend.getRegisteredStateNames().size());
+		Iterator<String> it = listState2.get().iterator();
+		assertTrue(!it.hasNext());
+		listState2.add("kevin");
+		listState2.add("sunny");
+
+		it = listState2.get().iterator();
+		assertEquals("kevin", it.next());
+		assertEquals("sunny", it.next());
+		assertTrue(!it.hasNext());
+	}
+
+	@Test
 	public void testRegisterStates() throws Exception {
 		DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
 		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1",
new JavaSerializer<>());


Mime
View raw message