From commits-return-16669-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Mar 29 17:59:44 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id A459518077D for ; Thu, 29 Mar 2018 17:59:42 +0200 (CEST) Received: (qmail 36506 invoked by uid 500); 29 Mar 2018 15:59:41 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 36286 invoked by uid 99); 29 Mar 2018 15:59:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Mar 2018 15:59:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1610BDF965; Thu, 29 Mar 2018 15:59:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kkloudas@apache.org To: commits@flink.apache.org Date: Thu, 29 Mar 2018 15:59:45 -0000 Message-Id: In-Reply-To: <9988e96f0a1d48608a2d81df0633c1ac@git.apache.org> References: <9988e96f0a1d48608a2d81df0633c1ac@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/5] flink git commit: [FLINK-8802] [QS] Fix concurrent access to non-duplicated serializers. [FLINK-8802] [QS] Fix concurrent access to non-duplicated serializers. This closes #5691. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db8e1f09 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db8e1f09 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db8e1f09 Branch: refs/heads/master Commit: db8e1f09bd7dcd9f392bf987e96cddcb34665b6c Parents: c16e2c9 Author: kkloudas Authored: Fri Mar 9 22:47:35 2018 +0100 Committer: kkloudas Committed: Thu Mar 29 17:35:39 2018 +0200 ---------------------------------------------------------------------- .../flink/queryablestate/network/Client.java | 2 +- .../server/KvStateServerHandler.java | 26 ++- .../queryablestate/network/ClientTest.java | 4 +- .../KVStateRequestSerializerRocksDBTest.java | 16 +- .../network/KvStateRequestSerializerTest.java | 30 ++- .../network/KvStateServerHandlerTest.java | 46 +++- .../flink/runtime/query/KvStateEntry.java | 75 +++++++ .../apache/flink/runtime/query/KvStateInfo.java | 114 ++++++++++ .../flink/runtime/query/KvStateRegistry.java | 21 +- .../runtime/query/TaskKvStateRegistry.java | 4 +- .../state/AbstractKeyedStateBackend.java | 23 +- .../state/heap/AbstractHeapMergingState.java | 9 +- .../runtime/state/heap/AbstractHeapState.java | 31 ++- .../state/heap/HeapAggregatingState.java | 19 +- .../runtime/state/heap/HeapFoldingState.java | 20 +- .../state/heap/HeapKeyedStateBackend.java | 16 +- .../flink/runtime/state/heap/HeapListState.java | 47 +++- .../flink/runtime/state/heap/HeapMapState.java | 55 +++-- .../runtime/state/heap/HeapReducingState.java | 20 +- .../runtime/state/heap/HeapValueState.java | 20 +- .../internal/InternalAggregatingState.java | 13 +- .../state/internal/InternalAppendingState.java | 12 +- .../state/internal/InternalFoldingState.java | 5 +- .../runtime/state/internal/InternalKvState.java | 37 +++- .../state/internal/InternalListState.java | 6 +- .../state/internal/InternalMapState.java | 5 +- .../state/internal/InternalMergingState.java | 12 +- .../state/internal/InternalReducingState.java | 5 +- .../state/internal/InternalValueState.java | 5 +- .../runtime/query/KvStateRegistryTest.java | 212 ++++++++++++++++++- .../runtime/state/StateBackendTestBase.java | 96 +++++---- .../state/StateSnapshotCompressionTest.java | 4 +- ...pKeyedStateBackendSnapshotMigrationTest.java | 2 +- .../streaming/state/AbstractRocksDBState.java | 58 +++-- .../state/RocksDBAggregatingState.java | 19 +- .../streaming/state/RocksDBFoldingState.java | 20 +- .../state/RocksDBKeyedStateBackend.java | 12 +- .../streaming/state/RocksDBListState.java | 19 +- .../streaming/state/RocksDBMapState.java | 152 +++++++++---- .../streaming/state/RocksDBReducingState.java | 19 +- .../streaming/state/RocksDBValueState.java | 19 +- .../windowing/EvictingWindowOperator.java | 4 +- .../operators/windowing/WindowOperator.java | 14 +- .../operators/windowing/TriggerTestHarness.java | 2 +- 44 files changed, 1074 insertions(+), 276 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java index 364f835..6b60492 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java @@ -281,7 +281,7 @@ public class Client { * @param request the request to be sent. * @return Future holding the serialized result */ - public CompletableFuture sendRequest(REQ request) { + CompletableFuture sendRequest(REQ request) { synchronized (connectLock) { if (failureCause != null) { return FutureUtils.getFailedFuture(failureCause); http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java index 18a2944..d46deff 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java @@ -26,6 +26,8 @@ import org.apache.flink.queryablestate.messages.KvStateResponse; import org.apache.flink.queryablestate.network.AbstractServerHandler; import org.apache.flink.queryablestate.network.messages.MessageSerializer; import org.apache.flink.queryablestate.network.stats.KvStateRequestStats; +import org.apache.flink.runtime.query.KvStateEntry; +import org.apache.flink.runtime.query.KvStateInfo; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.util.ExceptionUtils; @@ -33,9 +35,6 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.concurrent.CompletableFuture; /** @@ -50,8 +49,6 @@ import java.util.concurrent.CompletableFuture; @ChannelHandler.Sharable public class KvStateServerHandler extends AbstractServerHandler { - private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class); - /** KvState registry holding references to the KvState instances. */ private final KvStateRegistry registry; @@ -78,13 +75,13 @@ public class KvStateServerHandler extends AbstractServerHandler responseFuture = new CompletableFuture<>(); try { - final InternalKvState kvState = registry.getKvState(request.getKvStateId()); + final KvStateEntry kvState = registry.getKvState(request.getKvStateId()); if (kvState == null) { responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId())); } else { byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace(); - byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace); + byte[] serializedResult = getSerializedValue(kvState, serializedKeyAndNamespace); if (serializedResult != null) { responseFuture.complete(new KvStateResponse(serializedResult)); } else { @@ -100,6 +97,21 @@ public class KvStateServerHandler extends AbstractServerHandler byte[] getSerializedValue( + final KvStateEntry entry, + final byte[] serializedKeyAndNamespace) throws Exception { + + final InternalKvState state = entry.getState(); + final KvStateInfo infoForCurrentThread = entry.getInfoForCurrentThread(); + + return state.getSerializedValue( + serializedKeyAndNamespace, + infoForCurrentThread.getKeySerializer(), + infoForCurrentThread.getNamespaceSerializer(), + infoForCurrentThread.getStateValueSerializer() + ); + } + @Override public CompletableFuture shutdown() { return CompletableFuture.completedFuture(null); http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java index 6aa4710..bceb361 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java @@ -685,8 +685,8 @@ public class ClientTest { state.update(201 + i); - // we know it must be a KvStat but this is not exposed to the user via State - InternalKvState kvState = (InternalKvState) state; + // we know it must be a KvState but this is not exposed to the user via State + InternalKvState kvState = (InternalKvState) state; // Register KvState (one state instance for all server) ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState); http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java index dd75dd6..4985bf3 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java @@ -41,6 +41,7 @@ import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; import java.io.File; +import java.util.Map; import static org.mockito.Mockito.mock; @@ -82,7 +83,7 @@ public final class KVStateRequestSerializerRocksDBTest { } @Override - public InternalListState createListState( + public InternalListState createListState( final TypeSerializer namespaceSerializer, final ListStateDescriptor stateDesc) throws Exception { @@ -120,7 +121,7 @@ public final class KVStateRequestSerializerRocksDBTest { longHeapKeyedStateBackend.restore(null); longHeapKeyedStateBackend.setCurrentKey(key); - final InternalListState listState = longHeapKeyedStateBackend + final InternalListState listState = longHeapKeyedStateBackend .createListState(VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); @@ -159,11 +160,12 @@ public final class KVStateRequestSerializerRocksDBTest { longHeapKeyedStateBackend.restore(null); longHeapKeyedStateBackend.setCurrentKey(key); - final InternalMapState mapState = (InternalMapState) - longHeapKeyedStateBackend.getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE)); + final InternalMapState> mapState = + (InternalMapState>) + longHeapKeyedStateBackend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE)); KvStateRequestSerializerTest.testMapSerialization(key, mapState); longHeapKeyedStateBackend.dispose(); http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java index 8d10141..dac1b90 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java @@ -200,7 +200,7 @@ public class KvStateRequestSerializerTest { ); longHeapKeyedStateBackend.setCurrentKey(key); - final InternalListState listState = longHeapKeyedStateBackend.createListState( + final InternalListState listState = longHeapKeyedStateBackend.createListState( VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); @@ -220,7 +220,7 @@ public class KvStateRequestSerializerTest { */ public static void testListSerialization( final long key, - final InternalListState listState) throws Exception { + final InternalListState listState) throws Exception { TypeSerializer valueSerializer = LongSerializer.INSTANCE; listState.setCurrentNamespace(VoidNamespace.INSTANCE); @@ -240,7 +240,11 @@ public class KvStateRequestSerializerTest { key, LongSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE); - final byte[] serializedValues = listState.getSerializedValue(serializedKey); + final byte[] serializedValues = listState.getSerializedValue( + serializedKey, + listState.getKeySerializer(), + listState.getNamespaceSerializer(), + listState.getValueSerializer()); List actualValues = KvStateSerializer.deserializeList(serializedValues, valueSerializer); assertEquals(expectedValues, actualValues); @@ -303,10 +307,12 @@ public class KvStateRequestSerializerTest { ); longHeapKeyedStateBackend.setCurrentKey(key); - final InternalMapState mapState = (InternalMapState) longHeapKeyedStateBackend.getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE)); + final InternalMapState> mapState = + (InternalMapState>) + longHeapKeyedStateBackend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE)); testMapSerialization(key, mapState); } @@ -322,9 +328,9 @@ public class KvStateRequestSerializerTest { * * @throws Exception */ - public static void testMapSerialization( + public static > void testMapSerialization( final long key, - final InternalMapState mapState) throws Exception { + final InternalMapState mapState) throws Exception { TypeSerializer userKeySerializer = LongSerializer.INSTANCE; TypeSerializer userValueSerializer = StringSerializer.INSTANCE; @@ -348,7 +354,11 @@ public class KvStateRequestSerializerTest { key, LongSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE); - final byte[] serializedValues = mapState.getSerializedValue(serializedKey); + final byte[] serializedValues = mapState.getSerializedValue( + serializedKey, + mapState.getKeySerializer(), + mapState.getNamespaceSerializer(), + mapState.getValueSerializer()); Map actualValues = KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer); assertEquals(expectedValues.size(), actualValues.size()); http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java index 8b1517c..9947dac 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java @@ -21,7 +21,9 @@ package org.apache.flink.queryablestate.network; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.queryablestate.KvStateID; @@ -70,9 +72,6 @@ import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Tests for {@link KvStateServerHandler}. @@ -286,7 +285,7 @@ public class KvStateServerHandlerTest extends TestLogger { } /** - * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])} call. + * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[], TypeSerializer, TypeSerializer, TypeSerializer)} call. */ @Test public void testFailureOnGetSerializedValue() throws Exception { @@ -300,9 +299,42 @@ public class KvStateServerHandlerTest extends TestLogger { EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); // Failing KvState - InternalKvState kvState = mock(InternalKvState.class); - when(kvState.getSerializedValue(any(byte[].class))) - .thenThrow(new RuntimeException("Expected test Exception")); + InternalKvState kvState = + new InternalKvState() { + @Override + public TypeSerializer getKeySerializer() { + return IntSerializer.INSTANCE; + } + + @Override + public TypeSerializer getNamespaceSerializer() { + return VoidNamespaceSerializer.INSTANCE; + } + + @Override + public TypeSerializer getValueSerializer() { + return LongSerializer.INSTANCE; + } + + @Override + public void setCurrentNamespace(VoidNamespace namespace) { + // do nothing + } + + @Override + public byte[] getSerializedValue( + final byte[] serializedKeyAndNamespace, + final TypeSerializer safeKeySerializer, + final TypeSerializer safeNamespaceSerializer, + final TypeSerializer safeValueSerializer) throws Exception { + throw new RuntimeException("Expected test Exception"); + } + + @Override + public void clear() { + + } + }; KvStateID kvStateId = registry.registerKvState( new JobID(), http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java new file mode 100644 index 0000000..0bd132f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.query; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * An entry holding the {@link InternalKvState} along with its {@link KvStateInfo}. + * + * @param The type of key the state is associated to + * @param The type of the namespace the state is associated to + * @param The type of values kept internally in state + */ +@Internal +public class KvStateEntry { + + private final InternalKvState state; + private final KvStateInfo stateInfo; + + private final boolean areSerializersStateless; + + private final ConcurrentMap> serializerCache; + + public KvStateEntry(final InternalKvState state) { + this.state = Preconditions.checkNotNull(state); + this.stateInfo = new KvStateInfo<>( + state.getKeySerializer(), + state.getNamespaceSerializer(), + state.getValueSerializer() + ); + this.serializerCache = new ConcurrentHashMap<>(); + this.areSerializersStateless = stateInfo.duplicate() == stateInfo; + } + + public InternalKvState getState() { + return state; + } + + public KvStateInfo getInfoForCurrentThread() { + return areSerializersStateless + ? stateInfo + : serializerCache.computeIfAbsent(Thread.currentThread(), t -> stateInfo.duplicate()); + } + + public void clear() { + serializerCache.clear(); + } + + @VisibleForTesting + public int getCacheSize() { + return serializerCache.size(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java new file mode 100644 index 0000000..aa94e41 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.query; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import java.util.Objects; + +/** + * Metadata about a {@link InternalKvState}. This includes the serializers for + * the key, the namespace, and the values kept in the state. + * + * @param The type of key the state is associated to + * @param The type of the namespace the state is associated to + * @param The type of values kept internally in state + */ +public class KvStateInfo { + + private final TypeSerializer keySerializer; + private final TypeSerializer namespaceSerializer; + private final TypeSerializer stateValueSerializer; + + public KvStateInfo( + final TypeSerializer keySerializer, + final TypeSerializer namespaceSerializer, + final TypeSerializer stateValueSerializer + ) { + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer); + this.stateValueSerializer = Preconditions.checkNotNull(stateValueSerializer); + } + + /** + * @return The serializer for the key the state is associated to. + */ + public TypeSerializer getKeySerializer() { + return keySerializer; + } + + /** + * @return The serializer for the namespace the state is associated to. + */ + public TypeSerializer getNamespaceSerializer() { + return namespaceSerializer; + } + + /** + * @return The serializer for the values kept in the state. + */ + public TypeSerializer getStateValueSerializer() { + return stateValueSerializer; + } + + /** + * Creates a deep copy of the current {@link KvStateInfo} by duplicating + * all the included serializers. + * + *

This method assumes correct implementation of the {@link TypeSerializer#duplicate()} + * method of the included serializers. + */ + public KvStateInfo duplicate() { + final TypeSerializer dupKeySerializer = keySerializer.duplicate(); + final TypeSerializer dupNamespaceSerializer = namespaceSerializer.duplicate(); + final TypeSerializer dupSVSerializer = stateValueSerializer.duplicate(); + + if ( + dupKeySerializer == keySerializer && + dupNamespaceSerializer == namespaceSerializer && + dupSVSerializer == stateValueSerializer + ) { + return this; + } + + return new KvStateInfo<>(dupKeySerializer, dupNamespaceSerializer, dupSVSerializer); + + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KvStateInfo stateInfo = (KvStateInfo) o; + return Objects.equals(keySerializer, stateInfo.keySerializer) && + Objects.equals(namespaceSerializer, stateInfo.namespaceSerializer) && + Objects.equals(stateValueSerializer, stateInfo.stateValueSerializer); + } + + @Override + public int hashCode() { + return Objects.hash(keySerializer, namespaceSerializer, stateValueSerializer); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java index 2c55463..63d3c52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java @@ -41,8 +41,7 @@ import java.util.concurrent.ConcurrentHashMap; public class KvStateRegistry { /** All registered KvState instances. */ - private final ConcurrentHashMap> registeredKvStates = - new ConcurrentHashMap<>(); + private final ConcurrentHashMap> registeredKvStates = new ConcurrentHashMap<>(4); /** Registry listeners to be notified on registration/unregistration. */ private final ConcurrentHashMap listeners = new ConcurrentHashMap<>(4); @@ -86,11 +85,11 @@ public class KvStateRegistry { JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, - InternalKvState kvState) { + InternalKvState kvState) { KvStateID kvStateId = new KvStateID(); - if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) { + if (registeredKvStates.putIfAbsent(kvStateId, new KvStateEntry<>(kvState)) == null) { final KvStateRegistryListener listener = getKvStateRegistryListener(jobId); if (listener != null) { @@ -123,7 +122,10 @@ public class KvStateRegistry { String registrationName, KvStateID kvStateId) { - if (registeredKvStates.remove(kvStateId) != null) { + KvStateEntry entry = registeredKvStates.remove(kvStateId); + if (entry != null) { + entry.clear(); + final KvStateRegistryListener listener = getKvStateRegistryListener(jobId); if (listener != null) { listener.notifyKvStateUnregistered( @@ -136,13 +138,13 @@ public class KvStateRegistry { } /** - * Returns the KvState instance identified by the given KvStateID or - * null if none is registered. + * Returns the {@link KvStateEntry} containing the requested instance as identified by the + * given KvStateID, along with its {@link KvStateInfo} or null if none is registered. * * @param kvStateId KvStateID to identify the KvState instance - * @return KvState instance identified by the KvStateID or null + * @return The {@link KvStateEntry} instance identified by the KvStateID or null if there is none */ - public InternalKvState getKvState(KvStateID kvStateId) { + public KvStateEntry getKvState(KvStateID kvStateId) { return registeredKvStates.get(kvStateId); } @@ -174,5 +176,4 @@ public class KvStateRegistry { } return listener; } - } http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java index f799b5a..a44a508 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java @@ -60,7 +60,7 @@ public class TaskKvStateRegistry { * descriptor used to create the KvState instance) * @param kvState The */ - public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, InternalKvState kvState) { + public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, InternalKvState kvState) { KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupRange, registrationName, kvState); registeredKvStates.add(new KvStateInfo(keyGroupRange, registrationName, kvStateId)); } @@ -85,7 +85,7 @@ public class TaskKvStateRegistry { private final KvStateID kvStateId; - public KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) { + KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) { this.keyGroupRange = keyGroupRange; this.registrationName = registrationName; this.kvStateId = kvStateId; http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index e7b3a1a..287474c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -53,6 +53,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.HashMap; +import java.util.Map; import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -79,7 +80,7 @@ public abstract class AbstractKeyedStateBackend implements private int currentKeyGroup; /** So that we can give out state when the user uses the same key. */ - protected final HashMap> keyValueStatesByName; + protected final HashMap> keyValueStatesByName; /** For caching the last accessed partitioned state */ private String lastName; @@ -161,7 +162,7 @@ public abstract class AbstractKeyedStateBackend implements * @param The type of the namespace. * @param The type of the value that the {@code ValueState} can store. */ - protected abstract InternalValueState createValueState( + protected abstract InternalValueState createValueState( TypeSerializer namespaceSerializer, ValueStateDescriptor stateDesc) throws Exception; @@ -174,7 +175,7 @@ public abstract class AbstractKeyedStateBackend implements * @param The type of the namespace. * @param The type of the values that the {@code ListState} can store. */ - protected abstract InternalListState createListState( + protected abstract InternalListState createListState( TypeSerializer namespaceSerializer, ListStateDescriptor stateDesc) throws Exception; @@ -187,7 +188,7 @@ public abstract class AbstractKeyedStateBackend implements * @param The type of the namespace. * @param The type of the values that the {@code ListState} can store. */ - protected abstract InternalReducingState createReducingState( + protected abstract InternalReducingState createReducingState( TypeSerializer namespaceSerializer, ReducingStateDescriptor stateDesc) throws Exception; @@ -200,7 +201,7 @@ public abstract class AbstractKeyedStateBackend implements * @param The type of the namespace. * @param The type of the values that the {@code ListState} can store. */ - protected abstract InternalAggregatingState createAggregatingState( + protected abstract InternalAggregatingState createAggregatingState( TypeSerializer namespaceSerializer, AggregatingStateDescriptor stateDesc) throws Exception; @@ -217,7 +218,7 @@ public abstract class AbstractKeyedStateBackend implements * @deprecated will be removed in a future version */ @Deprecated - protected abstract InternalFoldingState createFoldingState( + protected abstract InternalFoldingState createFoldingState( TypeSerializer namespaceSerializer, FoldingStateDescriptor stateDesc) throws Exception; @@ -231,7 +232,7 @@ public abstract class AbstractKeyedStateBackend implements * @param Type of the keys in the state * @param Type of the values in the state * */ - protected abstract InternalMapState createMapState( + protected abstract InternalMapState> createMapState( TypeSerializer namespaceSerializer, MapStateDescriptor stateDesc) throws Exception; @@ -336,7 +337,7 @@ public abstract class AbstractKeyedStateBackend implements stateDescriptor.initializeSerializerUnlessSet(executionConfig); } - InternalKvState existing = keyValueStatesByName.get(stateDescriptor.getName()); + InternalKvState existing = keyValueStatesByName.get(stateDescriptor.getName()); if (existing != null) { @SuppressWarnings("unchecked") S typedState = (S) existing; @@ -379,7 +380,7 @@ public abstract class AbstractKeyedStateBackend implements }); @SuppressWarnings("unchecked") - InternalKvState kvState = (InternalKvState) state; + InternalKvState kvState = (InternalKvState) state; keyValueStatesByName.put(stateDescriptor.getName(), kvState); // Publish queryable state @@ -416,7 +417,7 @@ public abstract class AbstractKeyedStateBackend implements return (S) lastState; } - InternalKvState previous = keyValueStatesByName.get(stateDescriptor.getName()); + InternalKvState previous = keyValueStatesByName.get(stateDescriptor.getName()); if (previous != null) { lastState = previous; lastState.setCurrentNamespace(namespace); @@ -425,7 +426,7 @@ public abstract class AbstractKeyedStateBackend implements } final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor); - final InternalKvState kvState = (InternalKvState) state; + final InternalKvState kvState = (InternalKvState) state; lastName = stateDescriptor.getName(); lastState = kvState; http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java index 3e76423..df762b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java @@ -28,18 +28,19 @@ import org.apache.flink.runtime.state.internal.InternalMergingState; import java.util.Collection; /** - * Base class for {@link MergingState} ({@link org.apache.flink.runtime.state.internal.InternalMergingState}) - * that is stored on the heap. + * Base class for {@link MergingState} ({@link InternalMergingState}) that is stored on the heap. * * @param The type of the key. * @param The type of the namespace. + * @param The type of the input elements. * @param The type of the values in the state. + * @param The type of the output elements. * @param The type of State * @param The type of StateDescriptor for the State S */ -public abstract class AbstractHeapMergingState> +public abstract class AbstractHeapMergingState> extends AbstractHeapState - implements InternalMergingState { + implements InternalMergingState { /** * The merge transformation function that implements the merge logic. http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java index 7f629ae..e889c53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java @@ -37,8 +37,7 @@ import org.apache.flink.util.Preconditions; * @param The type of State * @param The type of StateDescriptor for the State S */ -public abstract class AbstractHeapState> - implements InternalKvState { +public abstract class AbstractHeapState> implements InternalKvState { /** Map containing the actual key/value pairs. */ protected final StateTable stateTable; @@ -86,28 +85,26 @@ public abstract class AbstractHeapState safeKeySerializer, + final TypeSerializer safeNamespaceSerializer, + final TypeSerializer safeValueSerializer) throws Exception { - Tuple2 keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace( - serializedKeyAndNamespace, keySerializer, namespaceSerializer); - - return getSerializedValue(keyAndNamespace.f0, keyAndNamespace.f1); - } + Preconditions.checkNotNull(serializedKeyAndNamespace); + Preconditions.checkNotNull(safeKeySerializer); + Preconditions.checkNotNull(safeNamespaceSerializer); + Preconditions.checkNotNull(safeValueSerializer); - public byte[] getSerializedValue(K key, N namespace) throws Exception { - Preconditions.checkState(namespace != null, "No namespace given."); - Preconditions.checkState(key != null, "No key given."); + Tuple2 keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace( + serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer); - SV result = stateTable.get(key, namespace); + SV result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1); if (result == null) { return null; } - - @SuppressWarnings("unchecked,rawtypes") - TypeSerializer serializer = stateDesc.getSerializer(); - return KvStateSerializer.serializeValue(result, serializer); + return KvStateSerializer.serializeValue(result, safeValueSerializer); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java index 6dd5cec..8e58ac8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java @@ -39,8 +39,8 @@ import java.io.IOException; * @param The type of the value returned from the state. */ public class HeapAggregatingState - extends AbstractHeapMergingState, AggregatingStateDescriptor> - implements InternalAggregatingState { + extends AbstractHeapMergingState, AggregatingStateDescriptor> + implements InternalAggregatingState { private final AggregateTransformation aggregateTransformation; @@ -64,6 +64,21 @@ public class HeapAggregatingState this.aggregateTransformation = new AggregateTransformation<>(stateDesc.getAggregateFunction()); } + @Override + public TypeSerializer getKeySerializer() { + return keySerializer; + } + + @Override + public TypeSerializer getNamespaceSerializer() { + return namespaceSerializer; + } + + @Override + public TypeSerializer getValueSerializer() { + return stateDesc.getSerializer(); + } + // ------------------------------------------------------------------------ // state access // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java index 3a77cca..ed1d0de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java @@ -29,8 +29,7 @@ import org.apache.flink.util.Preconditions; import java.io.IOException; /** - * Heap-backed partitioned {@link FoldingState} that is - * snapshotted into files. + * Heap-backed partitioned {@link FoldingState} that is snapshotted into files. * * @param The type of the key. * @param The type of the namespace. @@ -42,7 +41,7 @@ import java.io.IOException; @Deprecated public class HeapFoldingState extends AbstractHeapState, FoldingStateDescriptor> - implements InternalFoldingState { + implements InternalFoldingState { /** The function used to fold the state */ private final FoldTransformation foldTransformation; @@ -63,6 +62,21 @@ public class HeapFoldingState this.foldTransformation = new FoldTransformation<>(stateDesc); } + @Override + public TypeSerializer getKeySerializer() { + return keySerializer; + } + + @Override + public TypeSerializer getNamespaceSerializer() { + return namespaceSerializer; + } + + @Override + public TypeSerializer getValueSerializer() { + return stateDesc.getSerializer(); + } + // ------------------------------------------------------------------------ // state access // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 63eb33b..82f883c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -92,8 +92,7 @@ import java.util.stream.Stream; /** * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and will serialize state to - * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon - * checkpointing. + * streams provided by a {@link CheckpointStreamFactory} upon checkpointing. * * @param The key by which state is keyed. */ @@ -247,7 +246,7 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { } @Override - public InternalValueState createValueState( + public InternalValueState createValueState( TypeSerializer namespaceSerializer, ValueStateDescriptor stateDesc) throws Exception { @@ -256,7 +255,7 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { } @Override - public InternalListState createListState( + public InternalListState createListState( TypeSerializer namespaceSerializer, ListStateDescriptor stateDesc) throws Exception { @@ -265,7 +264,7 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { } @Override - public InternalReducingState createReducingState( + public InternalReducingState createReducingState( TypeSerializer namespaceSerializer, ReducingStateDescriptor stateDesc) throws Exception { @@ -274,7 +273,7 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { } @Override - public InternalAggregatingState createAggregatingState( + public InternalAggregatingState createAggregatingState( TypeSerializer namespaceSerializer, AggregatingStateDescriptor stateDesc) throws Exception { @@ -283,7 +282,7 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { } @Override - public InternalFoldingState createFoldingState( + public InternalFoldingState createFoldingState( TypeSerializer namespaceSerializer, FoldingStateDescriptor stateDesc) throws Exception { @@ -292,7 +291,8 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { } @Override - public InternalMapState createMapState(TypeSerializer namespaceSerializer, + protected InternalMapState> createMapState( + TypeSerializer namespaceSerializer, MapStateDescriptor stateDesc) throws Exception { StateTable> stateTable = tryRegisterStateTable( http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java index f7b5cd2..bd68560 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java @@ -21,7 +21,10 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.util.Preconditions; @@ -30,16 +33,15 @@ import java.util.ArrayList; import java.util.List; /** - * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted - * into files. + * Heap-backed partitioned {@link ListState} that is snapshotted into files. * * @param The type of the key. * @param The type of the namespace. * @param The type of the value. */ public class HeapListState - extends AbstractHeapMergingState, List, ListState, ListStateDescriptor> - implements InternalListState { + extends AbstractHeapMergingState, Iterable, ListState, ListStateDescriptor> + implements InternalListState { /** * Creates a new key/value state for the given hash map of key/value pairs. @@ -56,6 +58,21 @@ public class HeapListState super(stateDesc, stateTable, keySerializer, namespaceSerializer); } + @Override + public TypeSerializer getKeySerializer() { + return keySerializer; + } + + @Override + public TypeSerializer getNamespaceSerializer() { + return namespaceSerializer; + } + + @Override + public TypeSerializer> getValueSerializer() { + return stateDesc.getSerializer(); + } + // ------------------------------------------------------------------------ // state access // ------------------------------------------------------------------------ @@ -82,24 +99,34 @@ public class HeapListState } @Override - public byte[] getSerializedValue(K key, N namespace) throws Exception { - Preconditions.checkState(namespace != null, "No namespace given."); - Preconditions.checkState(key != null, "No key given."); + public byte[] getSerializedValue( + final byte[] serializedKeyAndNamespace, + final TypeSerializer safeKeySerializer, + final TypeSerializer safeNamespaceSerializer, + final TypeSerializer> safeValueSerializer) throws Exception { + + Preconditions.checkNotNull(serializedKeyAndNamespace); + Preconditions.checkNotNull(safeKeySerializer); + Preconditions.checkNotNull(safeNamespaceSerializer); + Preconditions.checkNotNull(safeValueSerializer); + + Tuple2 keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace( + serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer); - List result = stateTable.get(key, namespace); + List result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1); if (result == null) { return null; } - TypeSerializer serializer = stateDesc.getElementSerializer(); + final TypeSerializer dupSerializer = ((ListSerializer) safeValueSerializer).getElementSerializer(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos); // write the same as RocksDB writes lists, with one ',' separator for (int i = 0; i < result.size(); i++) { - serializer.serialize(result.get(i), view); + dupSerializer.serialize(result.get(i), view); if (i < result.size() -1) { view.writeByte(','); } http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java index 206f10a..7c18071 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java @@ -21,11 +21,12 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; +import org.apache.flink.runtime.state.HashMapSerializer; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.util.Preconditions; -import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -33,14 +34,14 @@ import java.util.Map; /** * Heap-backed partitioned {@link MapState} that is snapshotted into files. * - * @param The type of the key. - * @param The type of the namespace. + * @param The type of the key. + * @param The type of the namespace. * @param The type of the keys in the state. * @param The type of the values in the state. */ public class HeapMapState extends AbstractHeapState, MapState, MapStateDescriptor> - implements InternalMapState { + implements InternalMapState> { /** * Creates a new key/value state for the given hash map of key/value pairs. @@ -58,6 +59,24 @@ public class HeapMapState } @Override + public TypeSerializer getKeySerializer() { + return keySerializer; + } + + @Override + public TypeSerializer getNamespaceSerializer() { + return namespaceSerializer; + } + + @Override + public TypeSerializer> getValueSerializer() { + return new HashMapSerializer<>( + stateDesc.getKeySerializer(), + stateDesc.getValueSerializer() + ); + } + + @Override public UV get(UK userKey) { HashMap userMap = stateTable.get(currentNamespace); @@ -140,19 +159,31 @@ public class HeapMapState } @Override - public byte[] getSerializedValue(K key, N namespace) throws IOException { - Preconditions.checkState(namespace != null, "No namespace given."); - Preconditions.checkState(key != null, "No key given."); + public byte[] getSerializedValue( + final byte[] serializedKeyAndNamespace, + final TypeSerializer safeKeySerializer, + final TypeSerializer safeNamespaceSerializer, + final TypeSerializer> safeValueSerializer) throws Exception { - HashMap result = stateTable.get(key, namespace); + Preconditions.checkNotNull(serializedKeyAndNamespace); + Preconditions.checkNotNull(safeKeySerializer); + Preconditions.checkNotNull(safeNamespaceSerializer); + Preconditions.checkNotNull(safeValueSerializer); - if (null == result) { + Tuple2 keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace( + serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer); + + Map result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1); + + if (result == null) { return null; } - TypeSerializer userKeySerializer = stateDesc.getKeySerializer(); - TypeSerializer userValueSerializer = stateDesc.getValueSerializer(); + final HashMapSerializer serializer = (HashMapSerializer) safeValueSerializer; + + final TypeSerializer dupUserKeySerializer = serializer.getKeySerializer(); + final TypeSerializer dupUserValueSerializer = serializer.getValueSerializer(); - return KvStateSerializer.serializeMap(result.entrySet(), userKeySerializer, userValueSerializer); + return KvStateSerializer.serializeMap(result.entrySet(), dupUserKeySerializer, dupUserValueSerializer); } } http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java index 6e11327..58b3128 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java @@ -29,8 +29,7 @@ import org.apache.flink.util.Preconditions; import java.io.IOException; /** - * Heap-backed partitioned {@link org.apache.flink.api.common.state.ReducingState} that is - * snapshotted into files. + * Heap-backed partitioned {@link ReducingState} that is snapshotted into files. * * @param The type of the key. * @param The type of the namespace. @@ -38,7 +37,7 @@ import java.io.IOException; */ public class HeapReducingState extends AbstractHeapMergingState, ReducingStateDescriptor> - implements InternalReducingState { + implements InternalReducingState { private final ReduceTransformation reduceTransformation; @@ -59,6 +58,21 @@ public class HeapReducingState this.reduceTransformation = new ReduceTransformation<>(stateDesc.getReduceFunction()); } + @Override + public TypeSerializer getKeySerializer() { + return keySerializer; + } + + @Override + public TypeSerializer getNamespaceSerializer() { + return namespaceSerializer; + } + + @Override + public TypeSerializer getValueSerializer() { + return stateDesc.getSerializer(); + } + // ------------------------------------------------------------------------ // state access // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java index 6de62a8..bf0a3cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java @@ -24,8 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.internal.InternalValueState; /** - * Heap-backed partitioned {@link org.apache.flink.api.common.state.ValueState} that is snapshotted - * into files. + * Heap-backed partitioned {@link ValueState} that is snapshotted into files. * * @param The type of the key. * @param The type of the namespace. @@ -33,7 +32,7 @@ import org.apache.flink.runtime.state.internal.InternalValueState; */ public class HeapValueState extends AbstractHeapState, ValueStateDescriptor> - implements InternalValueState { + implements InternalValueState { /** * Creates a new key/value state for the given hash map of key/value pairs. @@ -51,6 +50,21 @@ public class HeapValueState } @Override + public TypeSerializer getKeySerializer() { + return keySerializer; + } + + @Override + public TypeSerializer getNamespaceSerializer() { + return namespaceSerializer; + } + + @Override + public TypeSerializer getValueSerializer() { + return stateDesc.getSerializer(); + } + + @Override public V value() { final V result = stateTable.get(currentNamespace); http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java index 15a8e31..b66404c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java @@ -24,10 +24,11 @@ import org.apache.flink.api.common.state.AggregatingState; * The peer to the {@link AggregatingState} in the internal state type hierarchy. * *

See {@link InternalKvState} for a description of the internal state hierarchy. - * - * @param The type of the namespace - * @param Type of the value added to the state. - * @param Type of the value extracted from the state. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the value added to the state + * @param The type of elements in the state + * @param Type of the value extracted from the state */ -public interface InternalAggregatingState - extends InternalMergingState, AggregatingState {} +public interface InternalAggregatingState extends InternalMergingState, AggregatingState {} http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java index ae9f457..3cb84af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java @@ -24,9 +24,11 @@ import org.apache.flink.api.common.state.AppendingState; * The peer to the {@link AppendingState} in the internal state type hierarchy. * *

See {@link InternalKvState} for a description of the internal state hierarchy. - * - * @param The type of the namespace - * @param The type of elements added to the state - * @param The type of the + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param The type of elements added to the state + * @param The type of elements in the state + * @param The type of the resulting element in the state */ -public interface InternalAppendingState extends InternalKvState, AppendingState {} +public interface InternalAppendingState extends InternalKvState, AppendingState {} http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java index 4ef258f..ed53d82 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java @@ -24,7 +24,8 @@ import org.apache.flink.api.common.state.FoldingState; * The peer to the {@link FoldingState} in the internal state type hierarchy. * *

See {@link InternalKvState} for a description of the internal state hierarchy. - * + * + * @param The type of key the state is associated to * @param The type of the namespace * @param Type of the values folded into the state * @param Type of the value in the state @@ -32,4 +33,4 @@ import org.apache.flink.api.common.state.FoldingState; * @deprecated will be removed in a future version */ @Deprecated -public interface InternalFoldingState extends InternalAppendingState, FoldingState {} +public interface InternalFoldingState extends InternalAppendingState, FoldingState {} http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java index 06f64b6..1310dd2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.internal; import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.typeutils.TypeSerializer; /** * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the @@ -52,10 +53,27 @@ import org.apache.flink.api.common.state.State; * | | * +---------InternalReducingState * - * - * @param The type of the namespace. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param The type of values kept internally in state */ -public interface InternalKvState extends State { +public interface InternalKvState extends State { + + /** + * Returns the {@link TypeSerializer} for the type of key this state is associated to. + */ + TypeSerializer getKeySerializer(); + + /** + * Returns the {@link TypeSerializer} for the type of namespace this state is associated to. + */ + TypeSerializer getNamespaceSerializer(); + + /** + * Returns the {@link TypeSerializer} for the type of value this state holds. + */ + TypeSerializer getValueSerializer(); /** * Sets the current namespace, which will be used when using the state access methods. @@ -70,10 +88,21 @@ public interface InternalKvState extends State { *

If no value is associated with key and namespace, null * is returned. * + *

TO IMPLEMENTERS: This method is called by multiple threads. Anything + * stateful (e.g. serializers) should be either duplicated or protected from undesired + * consequences of concurrent invocations. + * * @param serializedKeyAndNamespace Serialized key and namespace + * @param safeKeySerializer A key serializer which is safe to be used even in multi-threaded context + * @param safeNamespaceSerializer A namespace serializer which is safe to be used even in multi-threaded context + * @param safeValueSerializer A value serializer which is safe to be used even in multi-threaded context * @return Serialized value or null if no value is associated with the key and namespace. * * @throws Exception Exceptions during serialization are forwarded */ - byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception; + byte[] getSerializedValue( + final byte[] serializedKeyAndNamespace, + final TypeSerializer safeKeySerializer, + final TypeSerializer safeNamespaceSerializer, + final TypeSerializer safeValueSerializer) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java index 1e22dc6..1d6653b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java @@ -26,11 +26,13 @@ import java.util.List; * The peer to the {@link ListState} in the internal state type hierarchy. * *

See {@link InternalKvState} for a description of the internal state hierarchy. - * + * + * @param The type of key the state is associated to * @param The type of the namespace * @param The type of elements in the list */ -public interface InternalListState extends InternalMergingState>, ListState { +public interface InternalListState extends InternalMergingState, Iterable>, ListState { + /** * Updates the operator state accessible by {@link #get()} by updating existing values to * to the given list of values. The next time {@link #get()} is called (for the same state http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java index f2a7b41..91f698c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java @@ -20,13 +20,16 @@ package org.apache.flink.runtime.state.internal; import org.apache.flink.api.common.state.MapState; +import java.util.Map; + /** * The peer to the {@link MapState} in the internal state type hierarchy. * *

See {@link InternalKvState} for a description of the internal state hierarchy. * + * @param The type of key the state is associated to * @param The type of the namespace * @param Type of the values folded into the state * @param Type of the value in the state */ -public interface InternalMapState extends InternalKvState, MapState {} +public interface InternalMapState> extends InternalKvState, MapState {} http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java index abc7d7c..2c72697 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java @@ -26,12 +26,14 @@ import java.util.Collection; * The peer to the {@link MergingState} in the internal state type hierarchy. * * See {@link InternalKvState} for a description of the internal state hierarchy. - * - * @param The type of the namespace - * @param The type of elements added to the state - * @param The type of elements + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param The type of elements added to the state + * @param The type of elements in the state + * @param The type of elements */ -public interface InternalMergingState extends InternalAppendingState, MergingState { +public interface InternalMergingState extends InternalAppendingState, MergingState { /** * Merges the state of the current key for the given source namespaces into the state of http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java index 76fa58f..f7bff2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java @@ -24,8 +24,9 @@ import org.apache.flink.api.common.state.ReducingState; * The peer to the {@link ReducingState} in the internal state type hierarchy. * *

See {@link InternalKvState} for a description of the internal state hierarchy. - * + * + * @param The type of key the state is associated to * @param The type of the namespace * @param The type of elements in the aggregated by the ReduceFunction */ -public interface InternalReducingState extends InternalMergingState, ReducingState {} +public interface InternalReducingState extends InternalMergingState, ReducingState {} http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java index 7177b8a..169cdba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java @@ -24,8 +24,9 @@ import org.apache.flink.api.common.state.ValueState; * The peer to the {@link ValueState} in the internal state type hierarchy. * *

See {@link InternalKvState} for a description of the internal state hierarchy. - * + * + * @param The type of key the state is associated to * @param The type of the namespace * @param The type of elements in the list */ -public interface InternalValueState extends InternalKvState, ValueState {} +public interface InternalValueState extends InternalKvState, ValueState {} http://git-wip-us.apache.org/repos/asf/flink/blob/db8e1f09/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java index 43aa1d1..36a85d1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java @@ -19,27 +19,126 @@ package org.apache.flink.runtime.query; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; /** * Tests for the {@link KvStateRegistry}. */ public class KvStateRegistryTest extends TestLogger { + @Test + public void testKvStateEntry() throws InterruptedException { + final int threads = 10; + + final CountDownLatch latch1 = new CountDownLatch(threads); + final CountDownLatch latch2 = new CountDownLatch(1); + + final List> infos = Collections.synchronizedList(new ArrayList<>()); + + final JobID jobID = new JobID(); + + final JobVertexID jobVertexId = new JobVertexID(); + final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1); + final String registrationName = "foobar"; + + final KvStateRegistry kvStateRegistry = new KvStateRegistry(); + final KvStateID stateID = kvStateRegistry.registerKvState( + jobID, + jobVertexId, + keyGroupRange, + registrationName, + new DummyKvState() + ); + + final AtomicReference exceptionHolder = new AtomicReference<>(); + + for (int i = 0; i < threads; i++) { + new Thread(() -> { + final KvStateEntry kvState = kvStateRegistry.getKvState(stateID); + final KvStateInfo stateInfo = kvState.getInfoForCurrentThread(); + infos.add(stateInfo); + + latch1.countDown(); + try { + latch2.await(); + } catch (InterruptedException e) { + // compare and set, so that we do not overwrite an exception + // that was (potentially) already encountered. + exceptionHolder.compareAndSet(null, e); + } + + }).start(); + } + + latch1.await(); + + final KvStateEntry kvState = kvStateRegistry.getKvState(stateID); + + // verify that all the threads are done correctly. + Assert.assertEquals(threads, infos.size()); + Assert.assertEquals(threads, kvState.getCacheSize()); + + latch2.countDown(); + + for (KvStateInfo infoA: infos) { + boolean instanceAlreadyFound = false; + for (KvStateInfo infoB: infos) { + if (infoA == infoB) { + if (instanceAlreadyFound) { + Assert.fail("More than one thread sharing the same serializer instance."); + } + instanceAlreadyFound = true; + } else { + Assert.assertEquals(infoA, infoB); + } + } + } + + kvStateRegistry.unregisterKvState( + jobID, + jobVertexId, + keyGroupRange, + registrationName, + stateID); + + Assert.assertEquals(0L, kvState.getCacheSize()); + + Throwable t = exceptionHolder.get(); + if (t != null) { + fail(t.getMessage()); + } + } + /** * Tests that {@link KvStateRegistryListener} only receive the notifications which * are destined for them. @@ -74,7 +173,7 @@ public class KvStateRegistryTest extends TestLogger { jobVertexId, keyGroupRange, registrationName, - new DummyKvState<>()); + new DummyKvState()); assertThat(registeredNotifications1.poll(), equalTo(jobId1)); assertThat(registeredNotifications2.isEmpty(), is(true)); @@ -87,7 +186,7 @@ public class KvStateRegistryTest extends TestLogger { jobVertexId2, keyGroupRange2, registrationName2, - new DummyKvState<>()); + new DummyKvState()); assertThat(registeredNotifications2.poll(), equalTo(jobId2)); assertThat(registeredNotifications1.isEmpty(), is(true)); @@ -191,18 +290,35 @@ public class KvStateRegistryTest extends TestLogger { /** * Testing implementation of {@link InternalKvState}. - * - * @param type of the state */ - private static final class DummyKvState implements InternalKvState { + private static class DummyKvState implements InternalKvState { @Override - public void setCurrentNamespace(Object namespace) { + public TypeSerializer getKeySerializer() { + return IntSerializer.INSTANCE; + } + + @Override + public TypeSerializer getNamespaceSerializer() { + return VoidNamespaceSerializer.INSTANCE; + } + + @Override + public TypeSerializer getValueSerializer() { + return new DeepCopyingStringSerializer(); + } + + @Override + public void setCurrentNamespace(VoidNamespace namespace) { // noop } @Override - public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { + public byte[] getSerializedValue( + final byte[] serializedKeyAndNamespace, + final TypeSerializer safeKeySerializer, + final TypeSerializer safeNamespaceSerializer, + final TypeSerializer safeValueSerializer) throws Exception { return serializedKeyAndNamespace; } @@ -212,4 +328,86 @@ public class KvStateRegistryTest extends TestLogger { } } + /** + * A dummy serializer that just returns another instance when .duplicate(). + */ + private static class DeepCopyingStringSerializer extends TypeSerializer { + + private static final long serialVersionUID = -3744051158625555607L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer duplicate() { + return new DeepCopyingStringSerializer(); + } + + @Override + public String createInstance() { + return null; + } + + @Override + public String copy(String from) { + return null; + } + + @Override + public String copy(String from, String reuse) { + return null; + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(String record, DataOutputView target) throws IOException { + + } + + @Override + public String deserialize(DataInputView source) throws IOException { + return null; + } + + @Override + public String deserialize(String reuse, DataInputView source) throws IOException { + return null; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + + } + + @Override + public boolean equals(Object obj) { + return obj instanceof DeepCopyingStringSerializer; + } + + @Override + public boolean canEqual(Object obj) { + return true; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + return null; + } + + @Override + public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + return null; + } + } }