flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [5/5] flink git commit: [FLINK-8802] [QS] Fix concurrent access to non-duplicated serializers.
Date Thu, 29 Mar 2018 15:59:45 GMT
[FLINK-8802] [QS] Fix concurrent access to non-duplicated serializers.

This closes #5691.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db8e1f09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db8e1f09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db8e1f09

Branch: refs/heads/master
Commit: db8e1f09bd7dcd9f392bf987e96cddcb34665b6c
Parents: c16e2c9
Author: kkloudas <kkloudas@gmail.com>
Authored: Fri Mar 9 22:47:35 2018 +0100
Committer: kkloudas <kkloudas@gmail.com>
Committed: Thu Mar 29 17:35:39 2018 +0200

----------------------------------------------------------------------
 .../flink/queryablestate/network/Client.java    |   2 +-
 .../server/KvStateServerHandler.java            |  26 ++-
 .../queryablestate/network/ClientTest.java      |   4 +-
 .../KVStateRequestSerializerRocksDBTest.java    |  16 +-
 .../network/KvStateRequestSerializerTest.java   |  30 ++-
 .../network/KvStateServerHandlerTest.java       |  46 +++-
 .../flink/runtime/query/KvStateEntry.java       |  75 +++++++
 .../apache/flink/runtime/query/KvStateInfo.java | 114 ++++++++++
 .../flink/runtime/query/KvStateRegistry.java    |  21 +-
 .../runtime/query/TaskKvStateRegistry.java      |   4 +-
 .../state/AbstractKeyedStateBackend.java        |  23 +-
 .../state/heap/AbstractHeapMergingState.java    |   9 +-
 .../runtime/state/heap/AbstractHeapState.java   |  31 ++-
 .../state/heap/HeapAggregatingState.java        |  19 +-
 .../runtime/state/heap/HeapFoldingState.java    |  20 +-
 .../state/heap/HeapKeyedStateBackend.java       |  16 +-
 .../flink/runtime/state/heap/HeapListState.java |  47 +++-
 .../flink/runtime/state/heap/HeapMapState.java  |  55 +++--
 .../runtime/state/heap/HeapReducingState.java   |  20 +-
 .../runtime/state/heap/HeapValueState.java      |  20 +-
 .../internal/InternalAggregatingState.java      |  13 +-
 .../state/internal/InternalAppendingState.java  |  12 +-
 .../state/internal/InternalFoldingState.java    |   5 +-
 .../runtime/state/internal/InternalKvState.java |  37 +++-
 .../state/internal/InternalListState.java       |   6 +-
 .../state/internal/InternalMapState.java        |   5 +-
 .../state/internal/InternalMergingState.java    |  12 +-
 .../state/internal/InternalReducingState.java   |   5 +-
 .../state/internal/InternalValueState.java      |   5 +-
 .../runtime/query/KvStateRegistryTest.java      | 212 ++++++++++++++++++-
 .../runtime/state/StateBackendTestBase.java     |  96 +++++----
 .../state/StateSnapshotCompressionTest.java     |   4 +-
 ...pKeyedStateBackendSnapshotMigrationTest.java |   2 +-
 .../streaming/state/AbstractRocksDBState.java   |  58 +++--
 .../state/RocksDBAggregatingState.java          |  19 +-
 .../streaming/state/RocksDBFoldingState.java    |  20 +-
 .../state/RocksDBKeyedStateBackend.java         |  12 +-
 .../streaming/state/RocksDBListState.java       |  19 +-
 .../streaming/state/RocksDBMapState.java        | 152 +++++++++----
 .../streaming/state/RocksDBReducingState.java   |  19 +-
 .../streaming/state/RocksDBValueState.java      |  19 +-
 .../windowing/EvictingWindowOperator.java       |   4 +-
 .../operators/windowing/WindowOperator.java     |  14 +-
 .../operators/windowing/TriggerTestHarness.java |   2 +-
 44 files changed, 1074 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index 364f835..6b60492 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -281,7 +281,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 		 * @param request the request to be sent.
 		 * @return Future holding the serialized result
 		 */
-		public CompletableFuture<RESP> sendRequest(REQ request) {
+		CompletableFuture<RESP> sendRequest(REQ request) {
 			synchronized (connectLock) {
 				if (failureCause != null) {
 					return FutureUtils.getFailedFuture(failureCause);

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
index 18a2944..d46deff 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -26,6 +26,8 @@ import org.apache.flink.queryablestate.messages.KvStateResponse;
 import org.apache.flink.queryablestate.network.AbstractServerHandler;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.runtime.query.KvStateEntry;
+import org.apache.flink.runtime.query.KvStateInfo;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.ExceptionUtils;
@@ -33,9 +35,6 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -50,8 +49,6 @@ import java.util.concurrent.CompletableFuture;
 @ChannelHandler.Sharable
 public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class);
-
 	/** KvState registry holding references to the KvState instances. */
 	private final KvStateRegistry registry;
 
@@ -78,13 +75,13 @@ public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalR
 		final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
 
 		try {
-			final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
+			final KvStateEntry<?, ?, ?> kvState = registry.getKvState(request.getKvStateId());
 			if (kvState == null) {
 				responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
 			} else {
 				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
 
-				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
+				byte[] serializedResult = getSerializedValue(kvState, serializedKeyAndNamespace);
 				if (serializedResult != null) {
 					responseFuture.complete(new KvStateResponse(serializedResult));
 				} else {
@@ -100,6 +97,21 @@ public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalR
 		}
 	}
 
+	private static <K, N, V> byte[] getSerializedValue(
+			final KvStateEntry<K, N, V> entry,
+			final byte[] serializedKeyAndNamespace) throws Exception {
+
+		final InternalKvState<K, N, V> state = entry.getState();
+		final KvStateInfo<K, N, V> infoForCurrentThread = entry.getInfoForCurrentThread();
+
+		return state.getSerializedValue(
+				serializedKeyAndNamespace,
+				infoForCurrentThread.getKeySerializer(),
+				infoForCurrentThread.getNamespaceSerializer(),
+				infoForCurrentThread.getStateValueSerializer()
+		);
+	}
+
 	@Override
 	public CompletableFuture<Void> shutdown() {
 		return CompletableFuture.completedFuture(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 6aa4710..bceb361 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -685,8 +685,8 @@ public class ClientTest {
 
 				state.update(201 + i);
 
-				// we know it must be a KvStat but this is not exposed to the user via State
-				InternalKvState<?> kvState = (InternalKvState<?>) state;
+				// we know it must be a KvState but this is not exposed to the user via State
+				InternalKvState<Integer, ?, Integer> kvState = (InternalKvState<Integer, ?, Integer>) state;
 
 				// Register KvState (one state instance for all server)
 				ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index dd75dd6..4985bf3 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -41,6 +41,7 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
 import java.io.File;
+import java.util.Map;
 
 import static org.mockito.Mockito.mock;
 
@@ -82,7 +83,7 @@ public final class KVStateRequestSerializerRocksDBTest {
 		}
 
 		@Override
-		public <N, T> InternalListState<N, T> createListState(
+		public <N, T> InternalListState<K, N, T> createListState(
 			final TypeSerializer<N> namespaceSerializer,
 			final ListStateDescriptor<T> stateDesc) throws Exception {
 
@@ -120,7 +121,7 @@ public final class KVStateRequestSerializerRocksDBTest {
 		longHeapKeyedStateBackend.restore(null);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend
+		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend
 			.createListState(VoidNamespaceSerializer.INSTANCE,
 				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
 
@@ -159,11 +160,12 @@ public final class KVStateRequestSerializerRocksDBTest {
 		longHeapKeyedStateBackend.restore(null);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>)
-				longHeapKeyedStateBackend.getPartitionedState(
-						VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE,
-						new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+		final InternalMapState<Long, VoidNamespace, Long, String, Map<Long, String>> mapState =
+				(InternalMapState<Long, VoidNamespace, Long, String, Map<Long, String>>)
+						longHeapKeyedStateBackend.getPartitionedState(
+								VoidNamespace.INSTANCE,
+								VoidNamespaceSerializer.INSTANCE,
+								new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
 
 		KvStateRequestSerializerTest.testMapSerialization(key, mapState);
 		longHeapKeyedStateBackend.dispose();

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index 8d10141..dac1b90 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -200,7 +200,7 @@ public class KvStateRequestSerializerTest {
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState(
+		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState(
 				VoidNamespaceSerializer.INSTANCE,
 				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
 
@@ -220,7 +220,7 @@ public class KvStateRequestSerializerTest {
 	 */
 	public static void testListSerialization(
 			final long key,
-			final InternalListState<VoidNamespace, Long> listState) throws Exception {
+			final InternalListState<Long, VoidNamespace, Long> listState) throws Exception {
 
 		TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
 		listState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -240,7 +240,11 @@ public class KvStateRequestSerializerTest {
 				key, LongSerializer.INSTANCE,
 				VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
 
-		final byte[] serializedValues = listState.getSerializedValue(serializedKey);
+		final byte[] serializedValues = listState.getSerializedValue(
+				serializedKey,
+				listState.getKeySerializer(),
+				listState.getNamespaceSerializer(),
+				listState.getValueSerializer());
 
 		List<Long> actualValues = KvStateSerializer.deserializeList(serializedValues, valueSerializer);
 		assertEquals(expectedValues, actualValues);
@@ -303,10 +307,12 @@ public class KvStateRequestSerializerTest {
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>) longHeapKeyedStateBackend.getPartitionedState(
-				VoidNamespace.INSTANCE,
-				VoidNamespaceSerializer.INSTANCE,
-				new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+		final InternalMapState<Long, VoidNamespace, Long, String, HashMap<Long, String>> mapState =
+				(InternalMapState<Long, VoidNamespace, Long, String, HashMap<Long, String>>)
+						longHeapKeyedStateBackend.getPartitionedState(
+								VoidNamespace.INSTANCE,
+								VoidNamespaceSerializer.INSTANCE,
+								new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
 
 		testMapSerialization(key, mapState);
 	}
@@ -322,9 +328,9 @@ public class KvStateRequestSerializerTest {
 	 *
 	 * @throws Exception
 	 */
-	public static void testMapSerialization(
+	public static <M extends Map<Long, String>> void testMapSerialization(
 			final long key,
-			final InternalMapState<VoidNamespace, Long, String> mapState) throws Exception {
+			final InternalMapState<Long, VoidNamespace, Long, String, M> mapState) throws Exception {
 
 		TypeSerializer<Long> userKeySerializer = LongSerializer.INSTANCE;
 		TypeSerializer<String> userValueSerializer = StringSerializer.INSTANCE;
@@ -348,7 +354,11 @@ public class KvStateRequestSerializerTest {
 				key, LongSerializer.INSTANCE,
 				VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
 
-		final byte[] serializedValues = mapState.getSerializedValue(serializedKey);
+		final byte[] serializedValues = mapState.getSerializedValue(
+				serializedKey,
+				mapState.getKeySerializer(),
+				mapState.getNamespaceSerializer(),
+				mapState.getValueSerializer());
 
 		Map<Long, String> actualValues = KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer);
 		assertEquals(expectedValues.size(), actualValues.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 8b1517c..9947dac 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.queryablestate.network;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.queryablestate.KvStateID;
@@ -70,9 +72,6 @@ import java.util.concurrent.TimeoutException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link KvStateServerHandler}.
@@ -286,7 +285,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])} call.
+	 * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[], TypeSerializer, TypeSerializer, TypeSerializer)} call.
 	 */
 	@Test
 	public void testFailureOnGetSerializedValue() throws Exception {
@@ -300,9 +299,42 @@ public class KvStateServerHandlerTest extends TestLogger {
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
 		// Failing KvState
-		InternalKvState<?> kvState = mock(InternalKvState.class);
-		when(kvState.getSerializedValue(any(byte[].class)))
-				.thenThrow(new RuntimeException("Expected test Exception"));
+		InternalKvState<Integer, VoidNamespace, Long> kvState =
+				new InternalKvState<Integer, VoidNamespace, Long>() {
+					@Override
+					public TypeSerializer<Integer> getKeySerializer() {
+						return IntSerializer.INSTANCE;
+					}
+
+					@Override
+					public TypeSerializer<VoidNamespace> getNamespaceSerializer() {
+						return VoidNamespaceSerializer.INSTANCE;
+					}
+
+					@Override
+					public TypeSerializer<Long> getValueSerializer() {
+						return LongSerializer.INSTANCE;
+					}
+
+					@Override
+					public void setCurrentNamespace(VoidNamespace namespace) {
+						// do nothing
+					}
+
+					@Override
+					public byte[] getSerializedValue(
+							final byte[] serializedKeyAndNamespace,
+							final TypeSerializer<Integer> safeKeySerializer,
+							final TypeSerializer<VoidNamespace> safeNamespaceSerializer,
+							final TypeSerializer<Long> safeValueSerializer) throws Exception {
+						throw new RuntimeException("Expected test Exception");
+					}
+
+					@Override
+					public void clear() {
+
+					}
+				};
 
 		KvStateID kvStateId = registry.registerKvState(
 				new JobID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
new file mode 100644
index 0000000..0bd132f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * An entry holding the {@link InternalKvState} along with its {@link KvStateInfo}.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace the state is associated to
+ * @param <V> The type of values kept internally in state
+ */
+@Internal
+public class KvStateEntry<K, N, V> {
+
+	private final InternalKvState<K, N, V> state;
+	private final KvStateInfo<K, N, V> stateInfo;
+
+	private final boolean areSerializersStateless;
+
+	private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache;
+
+	public KvStateEntry(final InternalKvState<K, N, V> state) {
+		this.state = Preconditions.checkNotNull(state);
+		this.stateInfo = new KvStateInfo<>(
+				state.getKeySerializer(),
+				state.getNamespaceSerializer(),
+				state.getValueSerializer()
+		);
+		this.serializerCache = new ConcurrentHashMap<>();
+		this.areSerializersStateless = stateInfo.duplicate() == stateInfo;
+	}
+
+	public InternalKvState<K, N, V> getState() {
+		return state;
+	}
+
+	public KvStateInfo<K, N, V> getInfoForCurrentThread() {
+		return areSerializersStateless
+				? stateInfo
+				: serializerCache.computeIfAbsent(Thread.currentThread(), t -> stateInfo.duplicate());
+	}
+
+	public void clear() {
+		serializerCache.clear();
+	}
+
+	@VisibleForTesting
+	public int getCacheSize() {
+		return serializerCache.size();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java
new file mode 100644
index 0000000..aa94e41
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * Metadata about a {@link InternalKvState}. This includes the serializers for
+ * the key, the namespace, and the values kept in the state.
+ *
+ * @param <K>	The type of key the state is associated to
+ * @param <N>	The type of the namespace the state is associated to
+ * @param <V>	The type of values kept internally in state
+ */
+public class KvStateInfo<K, N, V> {
+
+	private final TypeSerializer<K> keySerializer;
+	private final TypeSerializer<N> namespaceSerializer;
+	private final TypeSerializer<V> stateValueSerializer;
+
+	public KvStateInfo(
+			final TypeSerializer<K> keySerializer,
+			final TypeSerializer<N> namespaceSerializer,
+			final TypeSerializer<V> stateValueSerializer
+	) {
+		this.keySerializer = Preconditions.checkNotNull(keySerializer);
+		this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer);
+		this.stateValueSerializer = Preconditions.checkNotNull(stateValueSerializer);
+	}
+
+	/**
+	 * @return The serializer for the key the state is associated to.
+	 */
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	/**
+	 * @return The serializer for the namespace the state is associated to.
+	 */
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	/**
+	 * @return The serializer for the values kept in the state.
+	 */
+	public TypeSerializer<V> getStateValueSerializer() {
+		return stateValueSerializer;
+	}
+
+	/**
+	 * Creates a deep copy of the current {@link KvStateInfo} by duplicating
+	 * all the included serializers.
+	 *
+	 * <p>This method assumes correct implementation of the {@link TypeSerializer#duplicate()}
+	 * method of the included serializers.
+	 */
+	public KvStateInfo<K, N, V> duplicate() {
+		final TypeSerializer<K> dupKeySerializer = keySerializer.duplicate();
+		final TypeSerializer<N> dupNamespaceSerializer = namespaceSerializer.duplicate();
+		final TypeSerializer<V> dupSVSerializer = stateValueSerializer.duplicate();
+
+		if (
+			dupKeySerializer == keySerializer &&
+			dupNamespaceSerializer == namespaceSerializer &&
+			dupSVSerializer == stateValueSerializer
+		) {
+			return this;
+		}
+
+		return new KvStateInfo<>(dupKeySerializer, dupNamespaceSerializer, dupSVSerializer);
+
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		KvStateInfo<?, ?, ?> stateInfo = (KvStateInfo<?, ?, ?>) o;
+		return Objects.equals(keySerializer, stateInfo.keySerializer) &&
+				Objects.equals(namespaceSerializer, stateInfo.namespaceSerializer) &&
+				Objects.equals(stateValueSerializer, stateInfo.stateValueSerializer);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(keySerializer, namespaceSerializer, stateValueSerializer);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index 2c55463..63d3c52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -41,8 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
 public class KvStateRegistry {
 
 	/** All registered KvState instances. */
-	private final ConcurrentHashMap<KvStateID, InternalKvState<?>> registeredKvStates =
-			new ConcurrentHashMap<>();
+	private final ConcurrentHashMap<KvStateID, KvStateEntry<?, ?, ?>> registeredKvStates = new ConcurrentHashMap<>(4);
 
 	/** Registry listeners to be notified on registration/unregistration. */
 	private final ConcurrentHashMap<JobID, KvStateRegistryListener> listeners = new ConcurrentHashMap<>(4);
@@ -86,11 +85,11 @@ public class KvStateRegistry {
 			JobVertexID jobVertexId,
 			KeyGroupRange keyGroupRange,
 			String registrationName,
-			InternalKvState<?> kvState) {
+			InternalKvState<?, ?, ?> kvState) {
 
 		KvStateID kvStateId = new KvStateID();
 
-		if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
+		if (registeredKvStates.putIfAbsent(kvStateId, new KvStateEntry<>(kvState)) == null) {
 			final KvStateRegistryListener listener = getKvStateRegistryListener(jobId);
 
 			if (listener != null) {
@@ -123,7 +122,10 @@ public class KvStateRegistry {
 			String registrationName,
 			KvStateID kvStateId) {
 
-		if (registeredKvStates.remove(kvStateId) != null) {
+		KvStateEntry<?, ?, ?> entry = registeredKvStates.remove(kvStateId);
+		if (entry != null) {
+			entry.clear();
+
 			final KvStateRegistryListener listener = getKvStateRegistryListener(jobId);
 			if (listener != null) {
 				listener.notifyKvStateUnregistered(
@@ -136,13 +138,13 @@ public class KvStateRegistry {
 	}
 
 	/**
-	 * Returns the KvState instance identified by the given KvStateID or
-	 * <code>null</code> if none is registered.
+	 * Returns the {@link KvStateEntry} containing the requested instance as identified by the
+	 * given KvStateID, along with its {@link KvStateInfo} or <code>null</code> if none is registered.
 	 *
 	 * @param kvStateId KvStateID to identify the KvState instance
-	 * @return KvState instance identified by the KvStateID or <code>null</code>
+	 * @return The {@link KvStateEntry} instance identified by the KvStateID or <code>null</code> if there is none
 	 */
-	public InternalKvState<?> getKvState(KvStateID kvStateId) {
+	public KvStateEntry<?, ?, ?> getKvState(KvStateID kvStateId) {
 		return registeredKvStates.get(kvStateId);
 	}
 
@@ -174,5 +176,4 @@ public class KvStateRegistry {
 		}
 		return listener;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
index f799b5a..a44a508 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -60,7 +60,7 @@ public class TaskKvStateRegistry {
 	 *                         descriptor used to create the KvState instance)
 	 * @param kvState          The
 	 */
-	public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, InternalKvState<?> kvState) {
+	public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, InternalKvState<?, ?, ?> kvState) {
 		KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupRange, registrationName, kvState);
 		registeredKvStates.add(new KvStateInfo(keyGroupRange, registrationName, kvStateId));
 	}
@@ -85,7 +85,7 @@ public class TaskKvStateRegistry {
 
 		private final KvStateID kvStateId;
 
-		public KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
+		KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
 			this.keyGroupRange = keyGroupRange;
 			this.registrationName = registrationName;
 			this.kvStateId = kvStateId;

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index e7b3a1a..287474c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -53,6 +53,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -79,7 +80,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	private int currentKeyGroup;
 
 	/** So that we can give out state when the user uses the same key. */
-	protected final HashMap<String, InternalKvState<?>> keyValueStatesByName;
+	protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
 
 	/** For caching the last accessed partitioned state */
 	private String lastName;
@@ -161,7 +162,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @param <N> The type of the namespace.
 	 * @param <T> The type of the value that the {@code ValueState} can store.
 	 */
-	protected abstract <N, T> InternalValueState<N, T> createValueState(
+	protected abstract <N, T> InternalValueState<K, N, T> createValueState(
 			TypeSerializer<N> namespaceSerializer,
 			ValueStateDescriptor<T> stateDesc) throws Exception;
 
@@ -174,7 +175,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @param <N> The type of the namespace.
 	 * @param <T> The type of the values that the {@code ListState} can store.
 	 */
-	protected abstract <N, T> InternalListState<N, T> createListState(
+	protected abstract <N, T> InternalListState<K, N, T> createListState(
 			TypeSerializer<N> namespaceSerializer,
 			ListStateDescriptor<T> stateDesc) throws Exception;
 
@@ -187,7 +188,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @param <N> The type of the namespace.
 	 * @param <T> The type of the values that the {@code ListState} can store.
 	 */
-	protected abstract <N, T> InternalReducingState<N, T> createReducingState(
+	protected abstract <N, T> InternalReducingState<K, N, T> createReducingState(
 			TypeSerializer<N> namespaceSerializer,
 			ReducingStateDescriptor<T> stateDesc) throws Exception;
 
@@ -200,7 +201,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @param <N> The type of the namespace.
 	 * @param <T> The type of the values that the {@code ListState} can store.
 	 */
-	protected abstract <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+	protected abstract <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
 			TypeSerializer<N> namespaceSerializer,
 			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception;
 
@@ -217,7 +218,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @deprecated will be removed in a future version
 	 */
 	@Deprecated
-	protected abstract <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
+	protected abstract <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
 			TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
 
@@ -231,7 +232,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @param <UK> Type of the keys in the state
 	 * @param <UV> Type of the values in the state	 *
 	 */
-	protected abstract <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
+	protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
 			TypeSerializer<N> namespaceSerializer,
 			MapStateDescriptor<UK, UV> stateDesc) throws Exception;
 
@@ -336,7 +337,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 			stateDescriptor.initializeSerializerUnlessSet(executionConfig);
 		}
 
-		InternalKvState<?> existing = keyValueStatesByName.get(stateDescriptor.getName());
+		InternalKvState<K, ?, ?> existing = keyValueStatesByName.get(stateDescriptor.getName());
 		if (existing != null) {
 			@SuppressWarnings("unchecked")
 			S typedState = (S) existing;
@@ -379,7 +380,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 		});
 
 		@SuppressWarnings("unchecked")
-		InternalKvState<N> kvState = (InternalKvState<N>) state;
+		InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;
 		keyValueStatesByName.put(stateDescriptor.getName(), kvState);
 
 		// Publish queryable state
@@ -416,7 +417,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 			return (S) lastState;
 		}
 
-		InternalKvState<?> previous = keyValueStatesByName.get(stateDescriptor.getName());
+		InternalKvState<K, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());
 		if (previous != null) {
 			lastState = previous;
 			lastState.setCurrentNamespace(namespace);
@@ -425,7 +426,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 		}
 
 		final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
-		final InternalKvState<N> kvState = (InternalKvState<N>) state;
+		final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;
 
 		lastName = stateDescriptor.getName();
 		lastState = kvState;

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
index 3e76423..df762b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
@@ -28,18 +28,19 @@ import org.apache.flink.runtime.state.internal.InternalMergingState;
 import java.util.Collection;
 
 /**
- * Base class for {@link MergingState} ({@link org.apache.flink.runtime.state.internal.InternalMergingState})
- * that is stored on the heap.
+ * Base class for {@link MergingState} ({@link InternalMergingState}) that is stored on the heap.
  *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
+ * @param <IN> The type of the input elements.
  * @param <SV> The type of the values in the state.
+ * @param <OUT> The type of the output elements.
  * @param <S> The type of State
  * @param <SD> The type of StateDescriptor for the State S
  */
-public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends State, SD extends StateDescriptor<S, ?>>
+public abstract class AbstractHeapMergingState<K, N, IN, SV, OUT, S extends State, SD extends StateDescriptor<S, SV>>
 		extends AbstractHeapState<K, N, SV, S, SD>
-		implements InternalMergingState<N, IN, OUT> {
+		implements InternalMergingState<K, N, IN, SV, OUT> {
 
 	/**
 	 * The merge transformation function that implements the merge logic.

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 7f629ae..e889c53 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -37,8 +37,7 @@ import org.apache.flink.util.Preconditions;
  * @param <S> The type of State
  * @param <SD> The type of StateDescriptor for the State S
  */
-public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
-		implements InternalKvState<N> {
+public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> implements InternalKvState<K, N, SV> {
 
 	/** Map containing the actual key/value pairs. */
 	protected final StateTable<K, N, SV> stateTable;
@@ -86,28 +85,26 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 	}
 
 	@Override
-	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
-		Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
+	public byte[] getSerializedValue(
+			final byte[] serializedKeyAndNamespace,
+			final TypeSerializer<K> safeKeySerializer,
+			final TypeSerializer<N> safeNamespaceSerializer,
+			final TypeSerializer<SV> safeValueSerializer) throws Exception {
 
-		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
-				serializedKeyAndNamespace, keySerializer, namespaceSerializer);
-
-		return getSerializedValue(keyAndNamespace.f0, keyAndNamespace.f1);
-	}
+		Preconditions.checkNotNull(serializedKeyAndNamespace);
+		Preconditions.checkNotNull(safeKeySerializer);
+		Preconditions.checkNotNull(safeNamespaceSerializer);
+		Preconditions.checkNotNull(safeValueSerializer);
 
-	public byte[] getSerializedValue(K key, N namespace) throws Exception {
-		Preconditions.checkState(namespace != null, "No namespace given.");
-		Preconditions.checkState(key != null, "No key given.");
+		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
+				serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
 
-		SV result = stateTable.get(key, namespace);
+		SV result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
 
 		if (result == null) {
 			return null;
 		}
-
-		@SuppressWarnings("unchecked,rawtypes")
-		TypeSerializer serializer = stateDesc.getSerializer();
-		return KvStateSerializer.serializeValue(result, serializer);
+		return KvStateSerializer.serializeValue(result, safeValueSerializer);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
index 6dd5cec..8e58ac8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -39,8 +39,8 @@ import java.io.IOException;
  * @param <OUT> The type of the value returned from the state.
  */
 public class HeapAggregatingState<K, N, IN, ACC, OUT>
-		extends AbstractHeapMergingState<K, N, IN, OUT, ACC, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
-		implements InternalAggregatingState<N, IN, OUT> {
+		extends AbstractHeapMergingState<K, N, IN, ACC, OUT, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
+		implements InternalAggregatingState<K, N, IN, ACC, OUT> {
 
 	private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;
 
@@ -64,6 +64,21 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
 		this.aggregateTransformation = new AggregateTransformation<>(stateDesc.getAggregateFunction());
 	}
 
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<ACC> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
 	// ------------------------------------------------------------------------
 	//  state access
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index 3a77cca..ed1d0de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -29,8 +29,7 @@ import org.apache.flink.util.Preconditions;
 import java.io.IOException;
 
 /**
- * Heap-backed partitioned {@link FoldingState} that is
- * snapshotted into files.
+ * Heap-backed partitioned {@link FoldingState} that is snapshotted into files.
  *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
@@ -42,7 +41,7 @@ import java.io.IOException;
 @Deprecated
 public class HeapFoldingState<K, N, T, ACC>
 		extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
-		implements InternalFoldingState<N, T, ACC> {
+		implements InternalFoldingState<K, N, T, ACC> {
 
 	/** The function used to fold the state */
 	private final FoldTransformation<T, ACC> foldTransformation;
@@ -63,6 +62,21 @@ public class HeapFoldingState<K, N, T, ACC>
 		this.foldTransformation = new FoldTransformation<>(stateDesc);
 	}
 
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<ACC> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
 	// ------------------------------------------------------------------------
 	//  state access
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 63eb33b..82f883c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -92,8 +92,7 @@ import java.util.stream.Stream;
 
 /**
  * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and will serialize state to
- * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
- * checkpointing.
+ * streams provided by a {@link CheckpointStreamFactory} upon checkpointing.
  *
  * @param <K> The key by which state is keyed.
  */
@@ -247,7 +246,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, V> InternalValueState<N, V> createValueState(
+	public <N, V> InternalValueState<K, N, V> createValueState(
 			TypeSerializer<N> namespaceSerializer,
 			ValueStateDescriptor<V> stateDesc) throws Exception {
 
@@ -256,7 +255,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, T> InternalListState<N, T> createListState(
+	public <N, T> InternalListState<K, N, T> createListState(
 			TypeSerializer<N> namespaceSerializer,
 			ListStateDescriptor<T> stateDesc) throws Exception {
 
@@ -265,7 +264,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, T> InternalReducingState<N, T> createReducingState(
+	public <N, T> InternalReducingState<K, N, T> createReducingState(
 			TypeSerializer<N> namespaceSerializer,
 			ReducingStateDescriptor<T> stateDesc) throws Exception {
 
@@ -274,7 +273,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+	public <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
 			TypeSerializer<N> namespaceSerializer,
 			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
 
@@ -283,7 +282,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
+	public <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
 			TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 
@@ -292,7 +291,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer,
+	protected <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
+			TypeSerializer<N> namespaceSerializer,
 			MapStateDescriptor<UK, UV> stateDesc) throws Exception {
 
 		StateTable<K, N, HashMap<UK, UV>> stateTable = tryRegisterStateTable(

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index f7b5cd2..bd68560 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -21,7 +21,10 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.Preconditions;
 
@@ -30,16 +33,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
- * into files.
+ * Heap-backed partitioned {@link ListState} that is snapshotted into files.
  *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of the value.
  */
 public class HeapListState<K, N, V>
-		extends AbstractHeapMergingState<K, N, V, Iterable<V>, List<V>, ListState<V>, ListStateDescriptor<V>>
-		implements InternalListState<N, V> {
+		extends AbstractHeapMergingState<K, N, V, List<V>, Iterable<V>, ListState<V>, ListStateDescriptor<V>>
+		implements InternalListState<K, N, V> {
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
@@ -56,6 +58,21 @@ public class HeapListState<K, N, V>
 		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<List<V>> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
 	// ------------------------------------------------------------------------
 	//  state access
 	// ------------------------------------------------------------------------
@@ -82,24 +99,34 @@ public class HeapListState<K, N, V>
 	}
 
 	@Override
-	public byte[] getSerializedValue(K key, N namespace) throws Exception {
-		Preconditions.checkState(namespace != null, "No namespace given.");
-		Preconditions.checkState(key != null, "No key given.");
+	public byte[] getSerializedValue(
+			final byte[] serializedKeyAndNamespace,
+			final TypeSerializer<K> safeKeySerializer,
+			final TypeSerializer<N> safeNamespaceSerializer,
+			final TypeSerializer<List<V>> safeValueSerializer) throws Exception {
+
+		Preconditions.checkNotNull(serializedKeyAndNamespace);
+		Preconditions.checkNotNull(safeKeySerializer);
+		Preconditions.checkNotNull(safeNamespaceSerializer);
+		Preconditions.checkNotNull(safeValueSerializer);
+
+		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
+				serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
 
-		List<V> result = stateTable.get(key, namespace);
+		List<V> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
 
 		if (result == null) {
 			return null;
 		}
 
-		TypeSerializer<V> serializer = stateDesc.getElementSerializer();
+		final TypeSerializer<V> dupSerializer = ((ListSerializer<V>) safeValueSerializer).getElementSerializer();
 
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
 
 		// write the same as RocksDB writes lists, with one ',' separator
 		for (int i = 0; i < result.size(); i++) {
-			serializer.serialize(result.get(i), view);
+			dupSerializer.serialize(result.get(i), view);
 			if (i < result.size() -1) {
 				view.writeByte(',');
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 206f10a..7c18071 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -21,11 +21,12 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.runtime.state.HashMapSerializer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -33,14 +34,14 @@ import java.util.Map;
 /**
  * Heap-backed partitioned {@link MapState} that is snapshotted into files.
  *
- * @param <K>  The type of the key.
- * @param <N>  The type of the namespace.
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
  * @param <UK> The type of the keys in the state.
  * @param <UV> The type of the values in the state.
  */
 public class HeapMapState<K, N, UK, UV>
 		extends AbstractHeapState<K, N, HashMap<UK, UV>, MapState<UK, UV>, MapStateDescriptor<UK, UV>>
-		implements InternalMapState<N, UK, UV> {
+		implements InternalMapState<K, N, UK, UV, HashMap<UK, UV>> {
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
@@ -58,6 +59,24 @@ public class HeapMapState<K, N, UK, UV>
 	}
 
 	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<HashMap<UK, UV>> getValueSerializer() {
+		return new HashMapSerializer<>(
+				stateDesc.getKeySerializer(),
+				stateDesc.getValueSerializer()
+		);
+	}
+
+	@Override
 	public UV get(UK userKey) {
 
 		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
@@ -140,19 +159,31 @@ public class HeapMapState<K, N, UK, UV>
 	}
 
 	@Override
-	public byte[] getSerializedValue(K key, N namespace) throws IOException {
-		Preconditions.checkState(namespace != null, "No namespace given.");
-		Preconditions.checkState(key != null, "No key given.");
+	public byte[] getSerializedValue(
+			final byte[] serializedKeyAndNamespace,
+			final TypeSerializer<K> safeKeySerializer,
+			final TypeSerializer<N> safeNamespaceSerializer,
+			final TypeSerializer<HashMap<UK, UV>> safeValueSerializer) throws Exception {
 
-		HashMap<UK, UV> result = stateTable.get(key, namespace);
+		Preconditions.checkNotNull(serializedKeyAndNamespace);
+		Preconditions.checkNotNull(safeKeySerializer);
+		Preconditions.checkNotNull(safeNamespaceSerializer);
+		Preconditions.checkNotNull(safeValueSerializer);
 
-		if (null == result) {
+		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
+				serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
+
+		Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
+
+		if (result == null) {
 			return null;
 		}
 
-		TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer();
-		TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer();
+		final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) safeValueSerializer;
+
+		final TypeSerializer<UK> dupUserKeySerializer = serializer.getKeySerializer();
+		final TypeSerializer<UV> dupUserValueSerializer = serializer.getValueSerializer();
 
-		return KvStateSerializer.serializeMap(result.entrySet(), userKeySerializer, userValueSerializer);
+		return KvStateSerializer.serializeMap(result.entrySet(), dupUserKeySerializer, dupUserValueSerializer);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index 6e11327..58b3128 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -29,8 +29,7 @@ import org.apache.flink.util.Preconditions;
 import java.io.IOException;
 
 /**
- * Heap-backed partitioned {@link org.apache.flink.api.common.state.ReducingState} that is
- * snapshotted into files.
+ * Heap-backed partitioned {@link ReducingState} that is snapshotted into files.
  *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
@@ -38,7 +37,7 @@ import java.io.IOException;
  */
 public class HeapReducingState<K, N, V>
 		extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>>
-		implements InternalReducingState<N, V> {
+		implements InternalReducingState<K, N, V> {
 
 	private final ReduceTransformation<V> reduceTransformation;
 
@@ -59,6 +58,21 @@ public class HeapReducingState<K, N, V>
 		this.reduceTransformation = new ReduceTransformation<>(stateDesc.getReduceFunction());
 	}
 
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<V> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
 	// ------------------------------------------------------------------------
 	//  state access
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
index 6de62a8..bf0a3cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
@@ -24,8 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 
 /**
- * Heap-backed partitioned {@link org.apache.flink.api.common.state.ValueState} that is snapshotted
- * into files.
+ * Heap-backed partitioned {@link ValueState} that is snapshotted into files.
  *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
@@ -33,7 +32,7 @@ import org.apache.flink.runtime.state.internal.InternalValueState;
  */
 public class HeapValueState<K, N, V>
 		extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
-		implements InternalValueState<N, V> {
+		implements InternalValueState<K, N, V> {
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
@@ -51,6 +50,21 @@ public class HeapValueState<K, N, V>
 	}
 
 	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Override
+	public TypeSerializer<V> getValueSerializer() {
+		return stateDesc.getSerializer();
+	}
+
+	@Override
 	public V value() {
 		final V result = stateTable.get(currentNamespace);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
index 15a8e31..b66404c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
@@ -24,10 +24,11 @@ import org.apache.flink.api.common.state.AggregatingState;
  * The peer to the {@link AggregatingState} in the internal state type hierarchy.
  * 
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
- * @param <N>   The type of the namespace
- * @param <IN>  Type of the value added to the state.
- * @param <OUT> Type of the value extracted from the state.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <IN> Type of the value added to the state
+ * @param <SV> The type of elements in the state
+ * @param <OUT> Type of the value extracted from the state
  */
-public interface InternalAggregatingState<N, IN, OUT> 
-		extends InternalMergingState<N, IN, OUT>, AggregatingState<IN, OUT> {}
+public interface InternalAggregatingState<K, N, IN, SV, OUT> extends InternalMergingState<K, N, IN, SV, OUT>, AggregatingState<IN, OUT> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
index ae9f457..3cb84af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
@@ -24,9 +24,11 @@ import org.apache.flink.api.common.state.AppendingState;
  * The peer to the {@link AppendingState} in the internal state type hierarchy.
  * 
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
- * @param <N>   The type of the namespace
- * @param <IN>  The type of elements added to the state
- * @param <OUT> The type of the 
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <IN> The type of elements added to the state
+ * @param <SV> The type of elements in the state
+ * @param <OUT> The type of the resulting element in the state
  */
-public interface InternalAppendingState<N, IN, OUT> extends InternalKvState<N>, AppendingState<IN, OUT> {}
+public interface InternalAppendingState<K, N, IN, SV, OUT> extends InternalKvState<K, N, SV>, AppendingState<IN, OUT> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
index 4ef258f..ed53d82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
@@ -24,7 +24,8 @@ import org.apache.flink.api.common.state.FoldingState;
  * The peer to the {@link FoldingState} in the internal state type hierarchy.
  * 
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
+ *
+ * @param <K> The type of key the state is associated to
  * @param <N> The type of the namespace
  * @param <T> Type of the values folded into the state
  * @param <ACC> Type of the value in the state
@@ -32,4 +33,4 @@ import org.apache.flink.api.common.state.FoldingState;
  * @deprecated will be removed in a future version
  */
 @Deprecated
-public interface InternalFoldingState<N, T, ACC> extends InternalAppendingState<N, T, ACC>, FoldingState<T, ACC> {}
+public interface InternalFoldingState<K, N, T, ACC> extends InternalAppendingState<K, N, T, ACC, ACC>, FoldingState<T, ACC> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
index 06f64b6..1310dd2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.internal;
 
 import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 /**
  * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the
@@ -52,10 +53,27 @@ import org.apache.flink.api.common.state.State;
  *                  |                |
  *                  +---------InternalReducingState
  * </pre>
- * 
- * @param <N> The type of the namespace.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <V> The type of values kept internally in state
  */
-public interface InternalKvState<N> extends State {
+public interface InternalKvState<K, N, V> extends State {
+
+	/**
+	 * Returns the {@link TypeSerializer} for the type of key this state is associated to.
+	 */
+	TypeSerializer<K> getKeySerializer();
+
+	/**
+	 * Returns the {@link TypeSerializer} for the type of namespace this state is associated to.
+	 */
+	TypeSerializer<N> getNamespaceSerializer();
+
+	/**
+	 * Returns the {@link TypeSerializer} for the type of value this state holds.
+	 */
+	TypeSerializer<V> getValueSerializer();
 
 	/**
 	 * Sets the current namespace, which will be used when using the state access methods.
@@ -70,10 +88,21 @@ public interface InternalKvState<N> extends State {
 	 * <p>If no value is associated with key and namespace, <code>null</code>
 	 * is returned.
 	 *
+	 * <p><b>TO IMPLEMENTERS:</b> This method is called by multiple threads. Anything
+	 * stateful (e.g. serializers) should be either duplicated or protected from undesired
+	 * consequences of concurrent invocations.
+	 *
 	 * @param serializedKeyAndNamespace Serialized key and namespace
+	 * @param safeKeySerializer A key serializer which is safe to be used even in multi-threaded context
+	 * @param safeNamespaceSerializer A namespace serializer which is safe to be used even in multi-threaded context
+	 * @param safeValueSerializer A value serializer which is safe to be used even in multi-threaded context
 	 * @return Serialized value or <code>null</code> if no value is associated with the key and namespace.
 	 * 
 	 * @throws Exception Exceptions during serialization are forwarded
 	 */
-	byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception;
+	byte[] getSerializedValue(
+			final byte[] serializedKeyAndNamespace,
+			final TypeSerializer<K> safeKeySerializer,
+			final TypeSerializer<N> safeNamespaceSerializer,
+			final TypeSerializer<V> safeValueSerializer) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
index 1e22dc6..1d6653b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
@@ -26,11 +26,13 @@ import java.util.List;
  * The peer to the {@link ListState} in the internal state type hierarchy.
  * 
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
+ *
+ * @param <K> The type of key the state is associated to
  * @param <N> The type of the namespace
  * @param <T> The type of elements in the list
  */
-public interface InternalListState<N, T> extends InternalMergingState<N, T, Iterable<T>>, ListState<T> {
+public interface InternalListState<K, N, T> extends InternalMergingState<K, N, T, List<T>, Iterable<T>>, ListState<T> {
+
 	/**
 	 * Updates the operator state accessible by {@link #get()} by updating existing values to
 	 * to the given list of values. The next time {@link #get()} is called (for the same state

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
index f2a7b41..91f698c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
@@ -20,13 +20,16 @@ package org.apache.flink.runtime.state.internal;
 
 import org.apache.flink.api.common.state.MapState;
 
+import java.util.Map;
+
 /**
  * The peer to the {@link MapState} in the internal state type hierarchy.
  *
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
  *
+ * @param <K> The type of key the state is associated to
  * @param <N> The type of the namespace
  * @param <UK> Type of the values folded into the state
  * @param <UV> Type of the value in the state
  */
-public interface InternalMapState<N, UK, UV> extends InternalKvState<N>, MapState<UK, UV> {}
+public interface InternalMapState<K, N, UK, UV, ST extends Map<UK, UV>> extends InternalKvState<K, N, ST>, MapState<UK, UV> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
index abc7d7c..2c72697 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
@@ -26,12 +26,14 @@ import java.util.Collection;
  * The peer to the {@link MergingState} in the internal state type hierarchy.
  * 
  * See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
- * @param <N>   The type of the namespace
- * @param <IN>  The type of elements added to the state
- * @param <OUT> The type of elements 
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <IN> The type of elements added to the state
+ * @param <SV> The type of elements in the state
+ * @param <OUT> The type of elements
  */
-public interface InternalMergingState<N, IN, OUT> extends InternalAppendingState<N, IN, OUT>, MergingState<IN, OUT> {
+public interface InternalMergingState<K, N, IN, SV, OUT> extends InternalAppendingState<K, N, IN, SV, OUT>, MergingState<IN, OUT> {
 
 	/**
 	 * Merges the state of the current key for the given source namespaces into the state of

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
index 76fa58f..f7bff2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
@@ -24,8 +24,9 @@ import org.apache.flink.api.common.state.ReducingState;
  * The peer to the {@link ReducingState} in the internal state type hierarchy.
  * 
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
+ *
+ * @param <K> The type of key the state is associated to
  * @param <N> The type of the namespace
  * @param <T> The type of elements in the aggregated by the ReduceFunction
  */
-public interface InternalReducingState<N, T> extends InternalMergingState<N, T, T>, ReducingState<T> {}
+public interface InternalReducingState<K, N, T> extends InternalMergingState<K, N, T, T, T>, ReducingState<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
index 7177b8a..169cdba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
@@ -24,8 +24,9 @@ import org.apache.flink.api.common.state.ValueState;
  * The peer to the {@link ValueState} in the internal state type hierarchy.
  * 
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
- * 
+ *
+ * @param <K> The type of key the state is associated to
  * @param <N> The type of the namespace
  * @param <T> The type of elements in the list
  */
-public interface InternalValueState<N, T> extends InternalKvState<N>, ValueState<T> {}
+public interface InternalValueState<K, N, T> extends InternalKvState<K, N, T>, ValueState<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
index 43aa1d1..36a85d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
@@ -19,27 +19,126 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link KvStateRegistry}.
  */
 public class KvStateRegistryTest extends TestLogger {
 
+	@Test
+	public void testKvStateEntry() throws InterruptedException {
+		final int threads = 10;
+
+		final CountDownLatch latch1 = new CountDownLatch(threads);
+		final CountDownLatch latch2 = new CountDownLatch(1);
+
+		final List<KvStateInfo<?, ?, ?>> infos = Collections.synchronizedList(new ArrayList<>());
+
+		final JobID jobID = new JobID();
+
+		final JobVertexID jobVertexId = new JobVertexID();
+		final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
+		final String registrationName = "foobar";
+
+		final KvStateRegistry kvStateRegistry = new KvStateRegistry();
+		final KvStateID stateID = kvStateRegistry.registerKvState(
+				jobID,
+				jobVertexId,
+				keyGroupRange,
+				registrationName,
+				new DummyKvState()
+		);
+
+		final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
+
+		for (int i = 0; i < threads; i++) {
+			new Thread(() -> {
+				final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
+				final KvStateInfo<?, ?, ?> stateInfo = kvState.getInfoForCurrentThread();
+				infos.add(stateInfo);
+
+				latch1.countDown();
+				try {
+					latch2.await();
+				} catch (InterruptedException e) {
+					// compare and set, so that we do not overwrite an exception
+					// that was (potentially) already encountered.
+					exceptionHolder.compareAndSet(null, e);
+				}
+
+			}).start();
+		}
+
+		latch1.await();
+
+		final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
+
+		// verify that all the threads are done correctly.
+		Assert.assertEquals(threads, infos.size());
+		Assert.assertEquals(threads, kvState.getCacheSize());
+
+		latch2.countDown();
+
+		for (KvStateInfo<?, ?, ?> infoA: infos) {
+			boolean instanceAlreadyFound = false;
+			for (KvStateInfo<?, ?, ?> infoB: infos) {
+				if (infoA == infoB) {
+					if (instanceAlreadyFound) {
+						Assert.fail("More than one thread sharing the same serializer instance.");
+					}
+					instanceAlreadyFound = true;
+				} else {
+					Assert.assertEquals(infoA, infoB);
+				}
+			}
+		}
+
+		kvStateRegistry.unregisterKvState(
+				jobID,
+				jobVertexId,
+				keyGroupRange,
+				registrationName,
+				stateID);
+
+		Assert.assertEquals(0L, kvState.getCacheSize());
+
+		Throwable t = exceptionHolder.get();
+		if (t != null) {
+			fail(t.getMessage());
+		}
+	}
+
 	/**
 	 * Tests that {@link KvStateRegistryListener} only receive the notifications which
 	 * are destined for them.
@@ -74,7 +173,7 @@ public class KvStateRegistryTest extends TestLogger {
 			jobVertexId,
 			keyGroupRange,
 			registrationName,
-			new DummyKvState<>());
+			new DummyKvState());
 
 		assertThat(registeredNotifications1.poll(), equalTo(jobId1));
 		assertThat(registeredNotifications2.isEmpty(), is(true));
@@ -87,7 +186,7 @@ public class KvStateRegistryTest extends TestLogger {
 			jobVertexId2,
 			keyGroupRange2,
 			registrationName2,
-			new DummyKvState<>());
+			new DummyKvState());
 
 		assertThat(registeredNotifications2.poll(), equalTo(jobId2));
 		assertThat(registeredNotifications1.isEmpty(), is(true));
@@ -191,18 +290,35 @@ public class KvStateRegistryTest extends TestLogger {
 
 	/**
 	 * Testing implementation of {@link InternalKvState}.
-	 *
-	 * @param <T> type of the state
 	 */
-	private static final class DummyKvState<T> implements InternalKvState<T> {
+	private static class DummyKvState implements InternalKvState<Integer, VoidNamespace, String> {
 
 		@Override
-		public void setCurrentNamespace(Object namespace) {
+		public TypeSerializer<Integer> getKeySerializer() {
+			return IntSerializer.INSTANCE;
+		}
+
+		@Override
+		public TypeSerializer<VoidNamespace> getNamespaceSerializer() {
+			return VoidNamespaceSerializer.INSTANCE;
+		}
+
+		@Override
+		public TypeSerializer<String> getValueSerializer() {
+			return new DeepCopyingStringSerializer();
+		}
+
+		@Override
+		public void setCurrentNamespace(VoidNamespace namespace) {
 			// noop
 		}
 
 		@Override
-		public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
+		public byte[] getSerializedValue(
+				final byte[] serializedKeyAndNamespace,
+				final TypeSerializer<Integer> safeKeySerializer,
+				final TypeSerializer<VoidNamespace> safeNamespaceSerializer,
+				final TypeSerializer<String> safeValueSerializer) throws Exception {
 			return serializedKeyAndNamespace;
 		}
 
@@ -212,4 +328,86 @@ public class KvStateRegistryTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * A dummy serializer that just returns another instance when .duplicate().
+	 */
+	private static class DeepCopyingStringSerializer extends TypeSerializer<String> {
+
+		private static final long serialVersionUID = -3744051158625555607L;
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<String> duplicate() {
+			return new DeepCopyingStringSerializer();
+		}
+
+		@Override
+		public String createInstance() {
+			return null;
+		}
+
+		@Override
+		public String copy(String from) {
+			return null;
+		}
+
+		@Override
+		public String copy(String from, String reuse) {
+			return null;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public void serialize(String record, DataOutputView target) throws IOException {
+
+		}
+
+		@Override
+		public String deserialize(DataInputView source) throws IOException {
+			return null;
+		}
+
+		@Override
+		public String deserialize(String reuse, DataInputView source) throws IOException {
+			return null;
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof DeepCopyingStringSerializer;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return true;
+		}
+
+		@Override
+		public int hashCode() {
+			return 0;
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			return null;
+		}
+
+		@Override
+		public CompatibilityResult<String> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			return null;
+		}
+	}
 }


Mime
View raw message