Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BFD5A200C41 for ; Fri, 24 Mar 2017 18:51:46 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BE757160B9C; Fri, 24 Mar 2017 17:51:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F00C4160B96 for ; Fri, 24 Mar 2017 18:51:44 +0100 (CET) Received: (qmail 79481 invoked by uid 500); 24 Mar 2017 17:51:44 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 79384 invoked by uid 99); 24 Mar 2017 17:51:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Mar 2017 17:51:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C360EDFFF0; Fri, 24 Mar 2017 17:51:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: srichter@apache.org To: commits@flink.apache.org Date: Fri, 24 Mar 2017 17:51:45 -0000 Message-Id: <920756bd6e394f008e978362217075eb@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] flink git commit: [FLINK-5715] Asynchronous snapshots for heap-based keyed state backend (backport from 1.3) archived-at: Fri, 24 Mar 2017 17:51:46 -0000 [FLINK-5715] Asynchronous snapshots for heap-based keyed state backend (backport from 1.3) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6a80725 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6a80725 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6a80725 Branch: refs/heads/release-1.2 Commit: c6a80725053c49dd2064405577291bdc86c82003 Parents: b703a24 Author: Stefan Richter Authored: Thu Mar 23 11:36:56 2017 +0100 Committer: Stefan Richter Committed: Fri Mar 24 18:51:19 2017 +0100 ---------------------------------------------------------------------- .../java/org/apache/flink/util/MathUtils.java | 47 +- .../state/AbstractKeyedStateBackend.java | 18 +- .../state/StateTransformationFunction.java | 42 + .../filesystem/async/AsyncFsStateBackend.java | 266 +++++ .../heap/async/AbstractHeapMergingState.java | 104 ++ .../state/heap/async/AbstractHeapState.java | 119 ++ .../heap/async/AbstractStateTableSnapshot.java | 51 + .../heap/async/AsyncHeapKeyedStateBackend.java | 433 +++++++ .../state/heap/async/CopyOnWriteStateTable.java | 1066 ++++++++++++++++++ .../async/CopyOnWriteStateTableSnapshot.java | 188 +++ .../state/heap/async/HeapFoldingState.java | 99 ++ .../runtime/state/heap/async/HeapListState.java | 122 ++ .../state/heap/async/HeapReducingState.java | 107 ++ .../state/heap/async/HeapValueState.java | 73 ++ .../state/heap/async/InternalKeyContext.java | 60 + .../runtime/state/heap/async/StateEntry.java | 44 + .../runtime/state/heap/async/StateTable.java | 189 ++++ .../heap/async/StateTableByKeyGroupReader.java | 38 + .../heap/async/StateTableByKeyGroupReaders.java | 136 +++ .../state/heap/async/StateTableSnapshot.java | 45 + .../memory/async/AsyncMemoryStateBackend.java | 94 ++ .../state/AsyncFileStateBackendTest.java | 213 ++++ .../state/AsyncMemoryStateBackendTest.java | 197 ++++ .../runtime/state/MemoryStateBackendTest.java | 2 +- .../runtime/state/StateBackendTestBase.java | 150 +++ .../heap/async/CopyOnWriteStateTableTest.java | 486 ++++++++ .../state/heap/async/HeapListStateTest.java | 238 ++++ .../state/heap/async/HeapReducingStateTest.java | 236 ++++ .../heap/async/HeapStateBackendTestBase.java | 37 + .../util/BlockerCheckpointStreamFactory.java | 118 ++ .../api/windowing/windows/TimeWindow.java | 49 +- ...tractEventTimeWindowCheckpointingITCase.java | 14 +- ...ckendEventTimeWindowCheckpointingITCase.java | 26 + ...ckendEventTimeWindowCheckpointingITCase.java | 26 + 34 files changed, 5106 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-core/src/main/java/org/apache/flink/util/MathUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java index 074e8ae..4c52b6e 100644 --- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java @@ -22,13 +22,13 @@ package org.apache.flink.util; * Collection of simple mathematical routines. */ public final class MathUtils { - + /** * Computes the logarithm of the given value to the base of 2, rounded down. It corresponds to the * position of the highest non-zero bit. The position is counted, starting with 0 from the least * significant bit to the most significant bit. For example, log2floor(16) = 4, and * log2floor(10) = 3. - * + * * @param value The value to compute the logarithm for. * @return The logarithm (rounded down) to the base of 2. * @throws ArithmeticException Thrown, if the given value is zero. @@ -40,11 +40,11 @@ public final class MathUtils { return 31 - Integer.numberOfLeadingZeros(value); } - + /** * Computes the logarithm of the given value to the base of 2. This method throws an error, * if the given argument is not a power of 2. - * + * * @param value The value to compute the logarithm for. * @return The logarithm to the base of 2. * @throws ArithmeticException Thrown, if the given value is zero. @@ -59,25 +59,25 @@ public final class MathUtils { } return 31 - Integer.numberOfLeadingZeros(value); } - + /** * Decrements the given number down to the closest power of two. If the argument is a * power of two, it remains unchanged. - * + * * @param value The value to round down. * @return The closest value that is a power of two and less or equal than the given value. */ public static int roundDownToPowerOf2(int value) { return Integer.highestOneBit(value); } - + /** * Casts the given value to a 32 bit integer, if it can be safely done. If the cast would change the numeric * value, this method raises an exception. *

* This method is a protection in places where one expects to be able to safely case, but where unexpected * situations could make the cast unsafe and would cause hidden problems that are hard to track down. - * + * * @param value The value to be cast to an integer. * @return The given value as an integer. * @see Math#toIntExact(long) @@ -172,8 +172,37 @@ public final class MathUtils { return x + 1; } + /** + * Pseudo-randomly maps a long (64-bit) to an integer (32-bit) using some bit-mixing for better distribution. + * + * @param in the long (64-bit)input. + * @return the bit-mixed int (32-bit) output + */ + public static int longToIntWithBitMixing(long in) { + in = (in ^ (in >>> 30)) * 0xbf58476d1ce4e5b9L; + in = (in ^ (in >>> 27)) * 0x94d049bb133111ebL; + in = in ^ (in >>> 31); + return (int) in; + } + + /** + * Bit-mixing for pseudo-randomization of integers (e.g., to guard against bad hash functions). Implementation is + * from Murmur's 32 bit finalizer. + * + * @param in the input value + * @return the bit-mixed output value + */ + public static int bitMix(int in) { + in ^= in >>> 16; + in *= 0x85ebca6b; + in ^= in >>> 13; + in *= 0xc2b2ae35; + in ^= in >>> 16; + return in; + } + // ============================================================================================ - + /** * Prevent Instantiation through private constructor. */ http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 2daf896..23c9a49 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -35,6 +35,8 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.heap.async.AbstractHeapMergingState; +import org.apache.flink.runtime.state.heap.async.InternalKeyContext; import org.apache.flink.util.Preconditions; import java.io.Closeable; @@ -51,7 +53,7 @@ import java.util.List; * @param Type of the key by which state is keyed. */ public abstract class AbstractKeyedStateBackend - implements KeyedStateBackend, Snapshotable, Closeable { + implements KeyedStateBackend, Snapshotable, Closeable, InternalKeyContext { /** {@link TypeSerializer} for our key. */ protected final TypeSerializer keySerializer; @@ -205,6 +207,7 @@ public abstract class AbstractKeyedStateBackend /** * @see KeyedStateBackend */ + @Override public KeyGroupRange getKeyGroupRange() { return keyGroupRange; } @@ -293,10 +296,16 @@ public abstract class AbstractKeyedStateBackend @Override @SuppressWarnings("unchecked,rawtypes") public > void mergePartitionedStates(final N target, Collection sources, final TypeSerializer namespaceSerializer, final StateDescriptor stateDescriptor) throws Exception { - if (stateDescriptor instanceof ReducingStateDescriptor) { + + State stateRef = getPartitionedState(target, namespaceSerializer, stateDescriptor); + if (stateRef instanceof AbstractHeapMergingState) { + + ((AbstractHeapMergingState) stateRef).mergeNamespaces(target, sources); + } else if (stateDescriptor instanceof ReducingStateDescriptor) { + ReducingStateDescriptor reducingStateDescriptor = (ReducingStateDescriptor) stateDescriptor; + ReducingState state = (ReducingState) stateRef; ReduceFunction reduceFn = reducingStateDescriptor.getReduceFunction(); - ReducingState state = (ReducingState) getPartitionedState(target, namespaceSerializer, stateDescriptor); KvState kvState = (KvState) state; Object result = null; for (N source: sources) { @@ -314,7 +323,8 @@ public abstract class AbstractKeyedStateBackend state.add(result); } } else if (stateDescriptor instanceof ListStateDescriptor) { - ListState state = (ListState) getPartitionedState(target, namespaceSerializer, stateDescriptor); + + ListState state = (ListState) stateRef; KvState kvState = (KvState) state; List result = new ArrayList<>(); for (N source: sources) { http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java new file mode 100644 index 0000000..182b4c8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateTransformationFunction.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.Internal; + +/** + * Interface for a binary function that is used for push-down of state transformation into state backends. The + * function takes as inputs the old state and an element. From those inputs, the function computes the new state. + * + * @param type of the previous state that is the bases for the computation of the new state. + * @param type of the element value that is used to compute the change of state. + */ +@Internal +public interface StateTransformationFunction { + + /** + * Binary function that applies a given value to the given old state to compute the new state. + * + * @param previousState the previous state that is the basis for the transformation. + * @param value the value that the implementation applies to the old state to obtain the new state. + * @return the new state, computed by applying the given value on the given old state. + * @throws Exception if something goes wrong in applying the transformation function. + */ + S apply(S previousState, T value) throws Exception; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java new file mode 100644 index 0000000..d90ffbd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/async/AsyncFsStateBackend.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem.async; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory; +import org.apache.flink.runtime.state.heap.async.AsyncHeapKeyedStateBackend; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * The file state backend is a state backend that stores the state of streaming jobs in a file system. + * + *

The state backend has one core directory into which it puts all checkpoint data. Inside that + * directory, it creates a directory per job, inside which each checkpoint gets a directory, with + * files for each state, for example: + * + * {@code hdfs://namenode:port/flink-checkpoints//chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 } + */ +public class AsyncFsStateBackend extends AbstractStateBackend { + + private static final long serialVersionUID = -8191916350224044011L; + + private static final Logger LOG = LoggerFactory.getLogger(AsyncFsStateBackend.class); + + /** By default, state smaller than 1024 bytes will not be written to files, but + * will be stored directly with the metadata */ + public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024; + + /** Maximum size of state that is stored with the metadata, rather than in files */ + private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; + + /** The path to the directory for the checkpoint data, including the file system + * description via scheme and optional authority */ + private final Path basePath; + + /** State below this size will be stored as part of the metadata, rather than in files */ + private final int fileStateThreshold; + + /** + * Creates a new state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + *

For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to the checkpoint data directory. + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public AsyncFsStateBackend(String checkpointDataUri) throws IOException { + this(new Path(checkpointDataUri)); + } + + /** + * Creates a new state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + *

For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to the checkpoint data directory. + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public AsyncFsStateBackend(Path checkpointDataUri) throws IOException { + this(checkpointDataUri.toUri()); + } + + /** + * Creates a new state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + *

For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to the checkpoint data directory. + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public AsyncFsStateBackend(URI checkpointDataUri) throws IOException { + this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD); + } + + /** + * Creates a new state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + *

For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to the checkpoint data directory. + * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata, + * rather than in files + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public AsyncFsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException { + if (fileStateSizeThreshold < 0) { + throw new IllegalArgumentException("The threshold for file state size must be zero or larger."); + } + if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) { + throw new IllegalArgumentException("The threshold for file state size cannot be larger than " + + MAX_FILE_STATE_THRESHOLD); + } + this.fileStateThreshold = fileStateSizeThreshold; + + this.basePath = validateAndNormalizeUri(checkpointDataUri); + } + + /** + * Gets the base directory where all state-containing files are stored. + * The job specific directory is created inside this directory. + * + * @return The base directory. + */ + public Path getBasePath() { + return basePath; + } + + // ------------------------------------------------------------------------ + // initialization and cleanup + // ------------------------------------------------------------------------ + + @Override + public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException { + return new FsCheckpointStreamFactory(basePath, jobId, fileStateThreshold); + } + + @Override + public AbstractKeyedStateBackend createKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry) throws Exception { + return new AsyncHeapKeyedStateBackend<>( + kvStateRegistry, + keySerializer, + env.getUserClassLoader(), + numberOfKeyGroups, + keyGroupRange); + } + + @Override + public String toString() { + return "File State Backend @ " + basePath; + } + + /** + * Checks and normalizes the checkpoint data URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URI + * to a path. + * + *

If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param checkpointDataUri The URI to check and normalize. + * @return A normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses scheme or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + public static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException { + final String scheme = checkpointDataUri.getScheme(); + final String path = checkpointDataUri.getPath(); + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI."); + } + if (path == null) { + throw new IllegalArgumentException("The path to store the checkpoint data in is null. " + + "Please specify a directory path for the checkpoint data."); + } + if (path.length() == 0 || path.equals("/")) { + throw new IllegalArgumentException("Cannot use the root directory for checkpoints."); + } + + if (!FileSystem.isFlinkSupportedScheme(checkpointDataUri.getScheme())) { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + return new Path(checkpointDataUri); + } else { + // we do a bit of work to make sure that the URI for the filesystem refers to exactly the same + // (distributed) filesystem on all hosts and includes full host/port information, even if the + // original URI did not include that. We count on the filesystem loading from the configuration + // to fill in the missing data. + + // try to grab the file system for this path/URI + FileSystem filesystem = FileSystem.get(checkpointDataUri); + if (filesystem == null) { + String reason = "Could not find a file system for the given scheme in" + + "the available configurations."; + LOG.warn("Could not verify checkpoint path. This might be caused by a genuine " + + "problem or by the fact that the file system is not accessible from the " + + "client. Reason:{}", reason); + return new Path(checkpointDataUri); + } + + URI fsURI = filesystem.getUri(); + try { + URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null); + return new Path(baseURI); + } catch (URISyntaxException e) { + String reason = String.format( + "Cannot create file system URI for checkpointDataUri %s and filesystem URI %s: " + e.toString(), + checkpointDataUri, + fsURI); + LOG.warn("Could not verify checkpoint path. This might be caused by a genuine " + + "problem or by the fact that the file system is not accessible from the " + + "client. Reason: {}", reason); + return new Path(checkpointDataUri); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java new file mode 100644 index 0000000..1b09d9c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapMergingState.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.StateTransformationFunction; + +import java.util.Collection; + +/** + * Base class for {@link MergingState} that is stored on the heap. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the values in the state. + * @param The type of State + * @param The type of StateDescriptor for the State S + */ +public abstract class AbstractHeapMergingState> + extends AbstractHeapState { + + /** + * The merge transformation function that implements the merge logic. + */ + private final MergeTransformation mergeTransformation; + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. + */ + protected AbstractHeapMergingState( + SD stateDesc, + StateTable stateTable, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer) { + + super(stateDesc, stateTable, keySerializer, namespaceSerializer); + this.mergeTransformation = new MergeTransformation(); + } + + public void mergeNamespaces(N target, Collection sources) throws Exception { + if (sources == null || sources.isEmpty()) { + return; // nothing to do + } + + final StateTable map = stateTable; + + SV merged = null; + + // merge the sources + for (N source : sources) { + + // get and remove the next source per namespace/key + SV sourceState = map.removeAndGetOld(source); + + if (merged != null && sourceState != null) { + merged = mergeState(merged, sourceState); + } else if (merged == null) { + merged = sourceState; + } + } + + // merge into the target, if needed + if (merged != null) { + map.transform(target, merged, mergeTransformation); + } + } + + protected abstract SV mergeState(SV a, SV b) throws Exception; + + final class MergeTransformation implements StateTransformationFunction { + + @Override + public SV apply(SV targetState, SV merged) throws Exception { + if (targetState != null) { + return mergeState(targetState, merged); + } else { + return merged; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java new file mode 100644 index 0000000..c93ea6a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractHeapState.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.util.Preconditions; + +/** + * Base class for partitioned {@link ListState} implementations that are backed by a regular + * heap hash map. The concrete implementations define how the state is checkpointed. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the values in the state. + * @param The type of State + * @param The type of StateDescriptor for the State S + */ +public abstract class AbstractHeapState> + implements KvState, State { + + /** Map containing the actual key/value pairs */ + protected final StateTable stateTable; + + /** This holds the name of the state and can create an initial default value for the state. */ + protected final SD stateDesc; + + /** The current namespace, which the access methods will refer to. */ + protected N currentNamespace; + + protected final TypeSerializer keySerializer; + + protected final TypeSerializer namespaceSerializer; + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. + */ + protected AbstractHeapState( + SD stateDesc, + StateTable stateTable, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer) { + + this.stateDesc = stateDesc; + this.stateTable = Preconditions.checkNotNull(stateTable, "State table must not be null."); + this.keySerializer = keySerializer; + this.namespaceSerializer = namespaceSerializer; + this.currentNamespace = null; + } + + // ------------------------------------------------------------------------ + + + public final void clear() { + stateTable.remove(currentNamespace); + } + + public final void setCurrentNamespace(N namespace) { + this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace must not be null."); + } + + public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { + Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); + + Tuple2 keyAndNamespace = KvStateRequestSerializer.deserializeKeyAndNamespace( + serializedKeyAndNamespace, keySerializer, namespaceSerializer); + + return getSerializedValue(keyAndNamespace.f0, keyAndNamespace.f1); + } + + public byte[] getSerializedValue(K key, N namespace) throws Exception { + Preconditions.checkState(namespace != null, "No namespace given."); + Preconditions.checkState(key != null, "No key given."); + + SV result = stateTable.get(key, namespace); + + if (result == null) { + return null; + } + + @SuppressWarnings("unchecked,rawtypes") + TypeSerializer serializer = stateDesc.getSerializer(); + return KvStateRequestSerializer.serializeValue(result, serializer); + } + + /** + * This should only be used for testing. + */ + @VisibleForTesting + public StateTable getStateTable() { + return stateTable; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java new file mode 100644 index 0000000..8a1d3f3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AbstractStateTableSnapshot.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +/** + * Abstract class to encapsulate the logic to take snapshots of {@link StateTable} implementations and also defines how + * the snapshot is written during the serialization phase of checkpointing. + */ +@Internal +abstract class AbstractStateTableSnapshot> implements StateTableSnapshot { + + /** + * The {@link StateTable} from which this snapshot was created. + */ + final T owningStateTable; + + /** + * Creates a new {@link AbstractStateTableSnapshot} for and owned by the given table. + * + * @param owningStateTable the {@link StateTable} for which this object represents a snapshot. + */ + AbstractStateTableSnapshot(T owningStateTable) { + this.owningStateTable = Preconditions.checkNotNull(owningStateTable); + } + + /** + * Optional hook to release resources for this snapshot at the end of its lifecycle. + */ + @Override + public void release() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java new file mode 100644 index 0000000..e19ed00 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.commons.collections.map.HashedMap; +import org.apache.commons.io.IOUtils; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.migration.MigrationUtil; +import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable; +import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.ArrayListSerializer; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeOffsets; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and will serialize state to + * streams provided by a {@link CheckpointStreamFactory} upon + * checkpointing. + * + * @param The key by which state is keyed. + */ +public class AsyncHeapKeyedStateBackend extends AbstractKeyedStateBackend { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncHeapKeyedStateBackend.class); + + /** + * Map of state tables that stores all state of key/value states. We store it centrally so + * that we can easily checkpoint/restore it. + * + *

The actual parameters of StateTable are {@code StateTable>} + * but we can't put them here because different key/value states with different types and + * namespace types share this central list of tables. + */ + private final HashMap> stateTables = new HashMap<>(); + + public AsyncHeapKeyedStateBackend( + TaskKvStateRegistry kvStateRegistry, + TypeSerializer keySerializer, + ClassLoader userCodeClassLoader, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange) { + + super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange); + LOG.info("Initializing heap keyed state backend with stream factory."); + } + + // ------------------------------------------------------------------------ + // state backend operations + // ------------------------------------------------------------------------ + + private StateTable tryRegisterStateTable( + TypeSerializer namespaceSerializer, StateDescriptor stateDesc) { + + return tryRegisterStateTable( + stateDesc.getName(), stateDesc.getType(), + namespaceSerializer, stateDesc.getSerializer()); + } + + private StateTable tryRegisterStateTable( + String stateName, + StateDescriptor.Type stateType, + TypeSerializer namespaceSerializer, + TypeSerializer valueSerializer) { + + final RegisteredBackendStateMetaInfo newMetaInfo = + new RegisteredBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer); + + @SuppressWarnings("unchecked") + StateTable stateTable = (StateTable) stateTables.get(stateName); + + if (stateTable == null) { + stateTable = newStateTable(newMetaInfo); + stateTables.put(stateName, stateTable); + } else { + if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) { + throw new RuntimeException("Trying to access state using incompatible meta info, was " + + stateTable.getMetaInfo() + " trying access with " + newMetaInfo); + } + stateTable.setMetaInfo(newMetaInfo); + } + return stateTable; + } + + private boolean hasRegisteredState() { + return !stateTables.isEmpty(); + } + + @Override + public ValueState createValueState( + TypeSerializer namespaceSerializer, + ValueStateDescriptor stateDesc) throws Exception { + + StateTable stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc); + return new HeapValueState<>(stateDesc, stateTable, keySerializer, namespaceSerializer); + } + + @Override + public ListState createListState( + TypeSerializer namespaceSerializer, + ListStateDescriptor stateDesc) throws Exception { + + // the list state does some manual mapping, because the state is typed to the generic + // 'List' interface, but we want to use an implementation typed to ArrayList + // using a more specialized implementation opens up runtime optimizations + + StateTable> stateTable = tryRegisterStateTable( + stateDesc.getName(), + stateDesc.getType(), + namespaceSerializer, + new ArrayListSerializer(stateDesc.getSerializer())); + + return new HeapListState<>(stateDesc, stateTable, keySerializer, namespaceSerializer); + } + + @Override + public ReducingState createReducingState( + TypeSerializer namespaceSerializer, + ReducingStateDescriptor stateDesc) throws Exception { + + StateTable stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc); + return new HeapReducingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer); + } + + @Override + public FoldingState createFoldingState( + TypeSerializer namespaceSerializer, + FoldingStateDescriptor stateDesc) throws Exception { + + StateTable stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc); + return new HeapFoldingState<>(stateDesc, stateTable, keySerializer, namespaceSerializer); + } + + @Override + @SuppressWarnings("unchecked") + public RunnableFuture snapshot( + final long checkpointId, + final long timestamp, + final CheckpointStreamFactory streamFactory) throws Exception { + + if (!hasRegisteredState()) { + return DoneFuture.nullValue(); + } + + long syncStartTime = System.currentTimeMillis(); + + Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE, + "Too many KV-States: " + stateTables.size() + + ". Currently at most " + Short.MAX_VALUE + " states are supported"); + + List> metaInfoProxyList = new ArrayList<>(stateTables.size()); + + final Map kVStateToId = new HashMap<>(stateTables.size()); + + final Map, StateTableSnapshot> cowStateStableSnapshots = new HashedMap(stateTables.size()); + + for (Map.Entry> kvState : stateTables.entrySet()) { + RegisteredBackendStateMetaInfo metaInfo = kvState.getValue().getMetaInfo(); + KeyedBackendSerializationProxy.StateMetaInfo metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo( + metaInfo.getStateType(), + metaInfo.getName(), + metaInfo.getNamespaceSerializer(), + metaInfo.getStateSerializer()); + + metaInfoProxyList.add(metaInfoProxy); + kVStateToId.put(kvState.getKey(), kVStateToId.size()); + StateTable stateTable = kvState.getValue(); + if (null != stateTable) { + cowStateStableSnapshots.put(stateTable, stateTable.createSnapshot()); + } + } + + final KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList); + + //--------------------------------------------------- this becomes the end of sync part + + // implementation of the async IO operation, based on FutureTask + final AbstractAsyncIOCallable ioCallable = + new AbstractAsyncIOCallable() { + + AtomicBoolean open = new AtomicBoolean(false); + + @Override + public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception { + if (open.compareAndSet(false, true)) { + CheckpointStreamFactory.CheckpointStateOutputStream stream = + streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); + try { + cancelStreamRegistry.registerClosable(stream); + return stream; + } catch (Exception ex) { + open.set(false); + throw ex; + } + } else { + throw new IOException("Operation already opened."); + } + } + + @Override + public KeyGroupsStateHandle performOperation() throws Exception { + long asyncStartTime = System.currentTimeMillis(); + CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle(); + DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream); + serializationProxy.write(outView); + + long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()]; + + for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) { + int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos); + keyGroupRangeOffsets[keyGroupPos] = stream.getPos(); + outView.writeInt(keyGroupId); + + for (Map.Entry> kvState : stateTables.entrySet()) { + outView.writeShort(kVStateToId.get(kvState.getKey())); + cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView, keyGroupId); + } + } + + if (open.compareAndSet(true, false)) { + StreamStateHandle streamStateHandle = stream.closeAndGetHandle(); + KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets); + final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle); + + LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime)); + + return keyGroupsStateHandle; + } else { + throw new IOException("Checkpoint stream already closed."); + } + } + + @Override + public void done(boolean canceled) { + if (open.compareAndSet(true, false)) { + CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle(); + if (null != stream) { + cancelStreamRegistry.unregisterClosable(stream); + IOUtils.closeQuietly(stream); + } + } + for (StateTableSnapshot snapshot : cowStateStableSnapshots.values()) { + snapshot.release(); + } + } + }; + + AsyncStoppableTaskWithCallback task = AsyncStoppableTaskWithCallback.from(ioCallable); + + LOG.info("Heap backend snapshot (" + streamFactory + ", synchronous part) in thread " + + Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms."); + + return task; + } + + @SuppressWarnings("deprecation") + @Override + public void restore(Collection restoredState) throws Exception { + LOG.info("Initializing heap keyed state backend from snapshot."); + + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring snapshot from state handles: {}.", restoredState); + } + + if (MigrationUtil.isOldSavepointKeyedState(restoredState)) { + throw new UnsupportedOperationException( + "This async.HeapKeyedStateBackend does not support restore from old savepoints."); + } else { + restorePartitionedState(restoredState); + } + } + + @SuppressWarnings({"unchecked"}) + private void restorePartitionedState(Collection state) throws Exception { + + final Map kvStatesById = new HashMap<>(); + int numRegisteredKvStates = 0; + stateTables.clear(); + + for (KeyGroupsStateHandle keyGroupsHandle : state) { + + if (keyGroupsHandle == null) { + continue; + } + + FSDataInputStream fsDataInputStream = keyGroupsHandle.openInputStream(); + cancelStreamRegistry.registerClosable(fsDataInputStream); + + try { + DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(userCodeClassLoader); + + serializationProxy.read(inView); + + List> metaInfoList = + serializationProxy.getNamedStateSerializationProxies(); + + for (KeyedBackendSerializationProxy.StateMetaInfo metaInfoSerializationProxy : metaInfoList) { + + StateTable stateTable = stateTables.get(metaInfoSerializationProxy.getStateName()); + + //important: only create a new table we did not already create it previously + if (null == stateTable) { + + RegisteredBackendStateMetaInfo registeredBackendStateMetaInfo = + new RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy); + + stateTable = newStateTable(registeredBackendStateMetaInfo); + stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable); + kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName()); + ++numRegisteredKvStates; + } + } + + for (Tuple2 groupOffset : keyGroupsHandle.getGroupRangeOffsets()) { + int keyGroupIndex = groupOffset.f0; + long offset = groupOffset.f1; + fsDataInputStream.seek(offset); + + int writtenKeyGroupIndex = inView.readInt(); + + Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex, + "Unexpected key-group in restore."); + + for (int i = 0; i < metaInfoList.size(); i++) { + int kvStateId = inView.readShort(); + StateTable stateTable = stateTables.get(kvStatesById.get(kvStateId)); + + // Hardcoding 2 as version will lead to the right method for the + // serialization format. Due to th backport, we should keep this fix and do + // not allow restore from a different format. + StateTableByKeyGroupReader keyGroupReader = + StateTableByKeyGroupReaders.readerForVersion( + stateTable, + 2); + + keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex); + } + } + } finally { + cancelStreamRegistry.unregisterClosable(fsDataInputStream); + IOUtils.closeQuietly(fsDataInputStream); + } + } + } + + @Override + public String toString() { + return "HeapKeyedStateBackend"; + } + + /** + * Returns the total number of state entries across all keys/namespaces. + */ + @VisibleForTesting + @SuppressWarnings("unchecked") + public int numStateEntries() { + int sum = 0; + for (StateTable stateTable : stateTables.values()) { + sum += stateTable.size(); + } + return sum; + } + + /** + * Returns the total number of state entries across all keys for the given namespace. + */ + @VisibleForTesting + public int numStateEntries(Object namespace) { + int sum = 0; + for (StateTable stateTable : stateTables.values()) { + sum += stateTable.sizeOfNamespace(namespace); + } + return sum; + } + + private StateTable newStateTable(RegisteredBackendStateMetaInfo newMetaInfo) { + return new CopyOnWriteStateTable<>(this, newMetaInfo); + } +}