Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6EC1C18F9B for ; Tue, 20 Oct 2015 07:58:59 +0000 (UTC) Received: (qmail 78839 invoked by uid 500); 20 Oct 2015 07:58:54 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 78751 invoked by uid 500); 20 Oct 2015 07:58:54 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 77691 invoked by uid 99); 20 Oct 2015 07:58:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Oct 2015 07:58:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4F85EDFFED; Tue, 20 Oct 2015 07:58:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Tue, 20 Oct 2015 07:59:21 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [30/47] flink git commit: [FLINK-2354] [runtime] Replace old StateHandleProvider by StateStorageHelper in ZooKeeperStateHandleStore http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java deleted file mode 100644 index 79512d7..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FileStreamStateHandle.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state.filesystem; - -import org.apache.flink.core.fs.Path; -import org.apache.flink.streaming.api.state.StreamStateHandle; - -import java.io.InputStream; - -/** - * A state handle that points to state in a file system, accessible as an input stream. - */ -public class FileStreamStateHandle extends AbstractFileState implements StreamStateHandle { - - private static final long serialVersionUID = -6826990484549987311L; - - /** - * Creates a new FileStreamStateHandle pointing to state at the given file path. - * - * @param filePath The path to the file containing the checkpointed state. - */ - public FileStreamStateHandle(Path filePath) { - super(filePath); - } - - @Override - public InputStream getState(ClassLoader userCodeClassLoader) throws Exception { - return getFileSystem().open(getFilePath()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java deleted file mode 100644 index 107a3be..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvState.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state.filesystem; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; -import org.apache.flink.streaming.api.state.AbstractHeapKvState; - -import java.io.DataOutputStream; -import java.util.HashMap; - -/** - * Heap-backed key/value state that is snapshotted into files. - * - * @param The type of the key. - * @param The type of the value. - */ -public class FsHeapKvState extends AbstractHeapKvState { - - /** The file system state backend backing snapshots of this state */ - private final FsStateBackend backend; - - /** - * Creates a new and empty key/value state. - * - * @param keySerializer The serializer for the key. - * @param valueSerializer The serializer for the value. - * @param defaultValue The value that is returned when no other value has been associated with a key, yet. - * @param backend The file system state backend backing snapshots of this state - */ - public FsHeapKvState(TypeSerializer keySerializer, TypeSerializer valueSerializer, - V defaultValue, FsStateBackend backend) { - super(keySerializer, valueSerializer, defaultValue); - this.backend = backend; - } - - /** - * Creates a new key/value state with the given state contents. - * This method is used to re-create key/value state with existing data, for example from - * a snapshot. - * - * @param keySerializer The serializer for the key. - * @param valueSerializer The serializer for the value. - * @param defaultValue The value that is returned when no other value has been associated with a key, yet. - * @param state The map of key/value pairs to initialize the state with. - * @param backend The file system state backend backing snapshots of this state - */ - public FsHeapKvState(TypeSerializer keySerializer, TypeSerializer valueSerializer, - V defaultValue, HashMap state, FsStateBackend backend) { - super(keySerializer, valueSerializer, defaultValue, state); - this.backend = backend; - } - - - @Override - public FsHeapKvStateSnapshot shapshot(long checkpointId, long timestamp) throws Exception { - // first, create an output stream to write to - try (FsStateBackend.FsCheckpointStateOutputStream out = - backend.createCheckpointStateOutputStream(checkpointId, timestamp)) { - - // serialize the state to the output stream - OutputViewDataOutputStreamWrapper outView = - new OutputViewDataOutputStreamWrapper(new DataOutputStream(out)); - outView.writeInt(size()); - writeStateToOutputView(outView); - outView.flush(); - - // create a handle to the state - return new FsHeapKvStateSnapshot<>(getKeySerializer(), getValueSerializer(), out.closeAndGetPath()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java deleted file mode 100644 index c7117f8..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsHeapKvStateSnapshot.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state.filesystem; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; -import org.apache.flink.streaming.api.state.KvStateSnapshot; - -import java.io.DataInputStream; -import java.util.HashMap; - -/** - * A snapshot of a heap key/value state stored in a file. - * - * @param The type of the key in the snapshot state. - * @param The type of the value in the snapshot state. - */ -public class FsHeapKvStateSnapshot extends AbstractFileState implements KvStateSnapshot { - - private static final long serialVersionUID = 1L; - - /** Name of the key serializer class */ - private final String keySerializerClassName; - - /** Name of the value serializer class */ - private final String valueSerializerClassName; - - /** - * Creates a new state snapshot with data in the file system. - * - * @param keySerializer The serializer for the keys. - * @param valueSerializer The serializer for the values. - * @param filePath The path where the snapshot data is stored. - */ - public FsHeapKvStateSnapshot(TypeSerializer keySerializer, TypeSerializer valueSerializer, Path filePath) { - super(filePath); - this.keySerializerClassName = keySerializer.getClass().getName(); - this.valueSerializerClassName = valueSerializer.getClass().getName(); - } - - @Override - public FsHeapKvState restoreState( - FsStateBackend stateBackend, - final TypeSerializer keySerializer, - final TypeSerializer valueSerializer, - V defaultValue, - ClassLoader classLoader) throws Exception { - - // validity checks - if (!keySerializer.getClass().getName().equals(keySerializerClassName) || - !valueSerializer.getClass().getName().equals(valueSerializerClassName)) { - throw new IllegalArgumentException( - "Cannot restore the state from the snapshot with the given serializers. " + - "State (K/V) was serialized with (" + valueSerializerClassName + - "/" + keySerializerClassName + ")"); - } - - // state restore - try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) { - InputViewDataInputStreamWrapper inView = new InputViewDataInputStreamWrapper(new DataInputStream(inStream)); - - final int numEntries = inView.readInt(); - HashMap stateMap = new HashMap<>(numEntries); - - for (int i = 0; i < numEntries; i++) { - K key = keySerializer.deserialize(inView); - V value = valueSerializer.deserialize(inView); - stateMap.put(key, value); - } - - return new FsHeapKvState(keySerializer, valueSerializer, defaultValue, stateMap, stateBackend); - } - catch (Exception e) { - throw new Exception("Failed to restore state from file system", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java deleted file mode 100644 index 3cbd227..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java +++ /dev/null @@ -1,412 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state.filesystem; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.streaming.api.state.StateBackend; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.UUID; - -/** - * The file state backend is a state backend that stores the state of streaming jobs in a file system. - * - *

The state backend has one core directory into which it puts all checkpoint data. Inside that - * directory, it creates a directory per job, inside which each checkpoint gets a directory, with - * files for each state, for example: - * - * {@code hdfs://namenode:port/flink-checkpoints//chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 } - */ -public class FsStateBackend extends StateBackend { - - private static final long serialVersionUID = -8191916350224044011L; - - private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class); - - - /** The path to the directory for the checkpoint data, including the file system - * description via scheme and optional authority */ - private final Path basePath; - - /** The directory (job specific) into this initialized instance of the backend stores its data */ - private transient Path checkpointDirectory; - - /** Cached handle to the file system for file operations */ - private transient FileSystem filesystem; - - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') - * must be accessible via {@link FileSystem#get(URI)}. - * - *

For a state backend targeting HDFS, this means that the URI must either specify the authority - * (host and port), or that the Hadoop configuration that describes that information must be in the - * classpath. - * - * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to teh checkpoint data directory. - * @throws IOException Thrown, if no file system can be found for the scheme in the URI. - */ - public FsStateBackend(String checkpointDataUri) throws IOException { - this(new Path(checkpointDataUri)); - } - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') - * must be accessible via {@link FileSystem#get(URI)}. - * - *

For a state backend targeting HDFS, this means that the URI must either specify the authority - * (host and port), or that the Hadoop configuration that describes that information must be in the - * classpath. - * - * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to teh checkpoint data directory. - * @throws IOException Thrown, if no file system can be found for the scheme in the URI. - */ - public FsStateBackend(Path checkpointDataUri) throws IOException { - this(checkpointDataUri.toUri()); - } - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') - * must be accessible via {@link FileSystem#get(URI)}. - * - *

For a state backend targeting HDFS, this means that the URI must either specify the authority - * (host and port), or that the Hadoop configuration that describes that information must be in the - * classpath. - * - * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to teh checkpoint data directory. - * @throws IOException Thrown, if no file system can be found for the scheme in the URI. - */ - public FsStateBackend(URI checkpointDataUri) throws IOException { - final String scheme = checkpointDataUri.getScheme(); - final String path = checkpointDataUri.getPath(); - - // some validity checks - if (scheme == null) { - throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + - "Please specify the file system scheme explicitly in the URI."); - } - if (path == null) { - throw new IllegalArgumentException("The path to store the checkpoint data in is null. " + - "Please specify a directory path for the checkpoint data."); - } - if (path.length() == 0 || path.equals("/")) { - throw new IllegalArgumentException("Cannot use the root directory for checkpoints."); - } - - // we do a bit of work to make sure that the URI for the filesystem refers to exactly the same - // (distributed) filesystem on all hosts and includes full host/port information, even if the - // original URI did not include that. We count on the filesystem loading from the configuration - // to fill in the missing data. - - // try to grab the file system for this path/URI - this.filesystem = FileSystem.get(checkpointDataUri); - if (this.filesystem == null) { - throw new IOException("Could not find a file system for the given scheme in the available configurations."); - } - - URI fsURI = this.filesystem.getUri(); - try { - URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null); - this.basePath = new Path(baseURI); - } - catch (URISyntaxException e) { - throw new IOException( - String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s", - checkpointDataUri, fsURI), e); - } - } - - /** - * Gets the base directory where all state-containing files are stored. - * The job specific directory is created inside this directory. - * - * @return The base directory. - */ - public Path getBasePath() { - return basePath; - } - - /** - * Gets the directory where this state backend stores its checkpoint data. Will be null if - * the state backend has not been initialized. - * - * @return The directory where this state backend stores its checkpoint data. - */ - public Path getCheckpointDirectory() { - return checkpointDirectory; - } - - /** - * Checks whether this state backend is initialized. Note that initialization does not carry - * across serialization. After each serialization, the state backend needs to be initialized. - * - * @return True, if the file state backend has been initialized, false otherwise. - */ - public boolean isInitialized() { - return filesystem != null && checkpointDirectory != null; - } - - /** - * Gets the file system handle for the file system that stores the state for this backend. - * - * @return This backend's file system handle. - */ - public FileSystem getFileSystem() { - if (filesystem != null) { - return filesystem; - } - else { - throw new IllegalStateException("State backend has not been initialized."); - } - } - - // ------------------------------------------------------------------------ - // initialization and cleanup - // ------------------------------------------------------------------------ - - @Override - public void initializeForJob(JobID jobId) throws Exception { - Path dir = new Path(basePath, jobId.toString()); - - LOG.info("Initializing file state backend to URI " + dir); - - filesystem = basePath.getFileSystem(); - filesystem.mkdirs(dir); - - checkpointDirectory = dir; - } - - @Override - public void disposeAllStateForCurrentJob() throws Exception { - FileSystem fs = this.filesystem; - Path dir = this.checkpointDirectory; - - if (fs != null && dir != null) { - this.filesystem = null; - this.checkpointDirectory = null; - fs.delete(dir, true); - } - else { - throw new IllegalStateException("state backend has not been initialized"); - } - } - - @Override - public void close() throws Exception {} - - // ------------------------------------------------------------------------ - // state backend operations - // ------------------------------------------------------------------------ - - @Override - public FsHeapKvState createKvState( - TypeSerializer keySerializer, TypeSerializer valueSerializer, V defaultValue) throws Exception { - return new FsHeapKvState(keySerializer, valueSerializer, defaultValue, this); - } - - @Override - public StateHandle checkpointStateSerializable( - S state, long checkpointID, long timestamp) throws Exception - { - checkFileSystemInitialized(); - - // make sure the directory for that specific checkpoint exists - final Path checkpointDir = createCheckpointDirPath(checkpointID); - filesystem.mkdirs(checkpointDir); - - - Exception latestException = null; - - for (int attempt = 0; attempt < 10; attempt++) { - Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString()); - FSDataOutputStream outStream; - try { - outStream = filesystem.create(targetPath, false); - } - catch (Exception e) { - latestException = e; - continue; - } - - ObjectOutputStream os = new ObjectOutputStream(outStream); - os.writeObject(state); - os.close(); - return new FileSerializableStateHandle(targetPath); - } - - throw new Exception("Could not open output stream for state backend", latestException); - } - - @Override - public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { - checkFileSystemInitialized(); - - final Path checkpointDir = createCheckpointDirPath(checkpointID); - filesystem.mkdirs(checkpointDir); - - Exception latestException = null; - - for (int attempt = 0; attempt < 10; attempt++) { - Path targetPath = new Path(checkpointDir, UUID.randomUUID().toString()); - try { - FSDataOutputStream outStream = filesystem.create(targetPath, false); - return new FsCheckpointStateOutputStream(outStream, targetPath, filesystem); - } - catch (Exception e) { - latestException = e; - } - } - throw new Exception("Could not open output stream for state backend", latestException); - } - - // ------------------------------------------------------------------------ - // utilities - // ------------------------------------------------------------------------ - - private void checkFileSystemInitialized() throws IllegalStateException { - if (filesystem == null || checkpointDirectory == null) { - throw new IllegalStateException("filesystem has not been re-initialized after deserialization"); - } - } - - private Path createCheckpointDirPath(long checkpointID) { - return new Path(checkpointDirectory, "chk-" + checkpointID); - } - - @Override - public String toString() { - return checkpointDirectory == null ? - "File State Backend @ " + basePath : - "File State Backend (initialized) @ " + checkpointDirectory; - } - - // ------------------------------------------------------------------------ - // Output stream for state checkpointing - // ------------------------------------------------------------------------ - - /** - * A CheckpointStateOutputStream that writes into a file and returns the path to that file upon - * closing. - */ - public static final class FsCheckpointStateOutputStream extends CheckpointStateOutputStream { - - private final FSDataOutputStream outStream; - - private final Path filePath; - - private final FileSystem fs; - - private boolean closed; - - FsCheckpointStateOutputStream(FSDataOutputStream outStream, Path filePath, FileSystem fs) { - this.outStream = outStream; - this.filePath = filePath; - this.fs = fs; - } - - - @Override - public void write(int b) throws IOException { - outStream.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - outStream.write(b, off, len); - } - - @Override - public void flush() throws IOException { - outStream.flush(); - } - - /** - * If the stream is only closed, we remove the produced file (cleanup through the auto close - * feature, for example). This method throws no exception if the deletion fails, but only - * logs the error. - */ - @Override - public void close() { - synchronized (this) { - if (!closed) { - closed = true; - try { - outStream.close(); - fs.delete(filePath, false); - - // attempt to delete the parent (will fail and be ignored if the parent has more files) - try { - fs.delete(filePath.getParent(), false); - } catch (IOException ignored) {} - } - catch (Exception e) { - LOG.warn("Cannot delete closed and discarded state stream to " + filePath, e); - } - } - } - } - - @Override - public FileStreamStateHandle closeAndGetHandle() throws IOException { - return new FileStreamStateHandle(closeAndGetPath()); - } - - /** - * Closes the stream and returns the path to the file that contains the stream's data. - * @return The path to the file that contains the stream's data. - * @throws IOException Thrown if the stream cannot be successfully closed. - */ - public Path closeAndGetPath() throws IOException { - synchronized (this) { - if (!closed) { - closed = true; - outStream.close(); - return filePath; - } - else { - throw new IOException("Stream has already been closed and discarded."); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java deleted file mode 100644 index f0ad6bd..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackendFactory.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state.filesystem; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.fs.Path; -import org.apache.flink.streaming.api.state.StateBackendFactory; - -/** - * A factory that creates an {@link org.apache.flink.streaming.api.state.filesystem.FsStateBackend} - * from a configuration. - */ -public class FsStateBackendFactory implements StateBackendFactory { - - /** The key under which the config stores the directory where checkpoints should be stored */ - public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir"; - - - @Override - public FsStateBackend createFromConfig(Configuration config) throws Exception { - String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null); - - if (checkpointDirURI == null) { - throw new IllegalConfigurationException( - "Cannot create the file system state backend: The configuration does not specify the " + - "checkpoint directory '" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\''); - } - - try { - Path path = new Path(checkpointDirURI); - return new FsStateBackend(path); - } - catch (IllegalArgumentException e) { - throw new Exception("Cannot initialize File System State Backend with URI '" - + checkpointDirURI + '.', e); - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java deleted file mode 100644 index 7952e58..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/ByteStreamStateHandle.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state.memory; - -import org.apache.flink.streaming.api.state.StreamStateHandle; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; - -/** - * A state handle that contains stream state in a byte array. - */ -public final class ByteStreamStateHandle implements StreamStateHandle { - - private static final long serialVersionUID = -5280226231200217594L; - - /** the state data */ - private final byte[] data; - - /** - * Creates a new ByteStreamStateHandle containing the given data. - * - * @param data The state data. - */ - public ByteStreamStateHandle(byte[] data) { - this.data = data; - } - - @Override - public InputStream getState(ClassLoader userCodeClassLoader) { - return new ByteArrayInputStream(data); - } - - @Override - public void discardState() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java deleted file mode 100644 index e611887..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemHeapKvState.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state.memory; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.util.DataOutputSerializer; -import org.apache.flink.streaming.api.state.AbstractHeapKvState; - -import java.util.HashMap; - -/** - * Heap-backed key/value state that is snapshotted into a serialized memory copy. - * - * @param The type of the key. - * @param The type of the value. - */ -public class MemHeapKvState extends AbstractHeapKvState { - - public MemHeapKvState(TypeSerializer keySerializer, TypeSerializer valueSerializer, V defaultValue) { - super(keySerializer, valueSerializer, defaultValue); - } - - public MemHeapKvState(TypeSerializer keySerializer, TypeSerializer valueSerializer, - V defaultValue, HashMap state) { - super(keySerializer, valueSerializer, defaultValue, state); - } - - @Override - public MemoryHeapKvStateSnapshot shapshot(long checkpointId, long timestamp) throws Exception { - DataOutputSerializer ser = new DataOutputSerializer(Math.max(size() * 16, 16)); - writeStateToOutputView(ser); - byte[] bytes = ser.getCopyOfBuffer(); - - return new MemoryHeapKvStateSnapshot(getKeySerializer(), getValueSerializer(), bytes, size()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java deleted file mode 100644 index 7f50379..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryHeapKvStateSnapshot.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state.memory; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.streaming.api.state.KvStateSnapshot; - -import java.util.HashMap; - -/** - * A snapshot of a {@link MemHeapKvState} for a checkpoint. The data is stored in a heap byte - * array, in serialized form. - * - * @param The type of the key in the snapshot state. - * @param The type of the value in the snapshot state. - */ -public class MemoryHeapKvStateSnapshot implements KvStateSnapshot { - - private static final long serialVersionUID = 1L; - - /** Name of the key serializer class */ - private final String keySerializerClassName; - - /** Name of the value serializer class */ - private final String valueSerializerClassName; - - /** The serialized data of the state key/value pairs */ - private final byte[] data; - - /** The number of key/value pairs */ - private final int numEntries; - - /** - * Creates a new heap memory state snapshot. - * - * @param keySerializer The serializer for the keys. - * @param valueSerializer The serializer for the values. - * @param data The serialized data of the state key/value pairs - * @param numEntries The number of key/value pairs - */ - public MemoryHeapKvStateSnapshot(TypeSerializer keySerializer, - TypeSerializer valueSerializer, byte[] data, int numEntries) { - this.keySerializerClassName = keySerializer.getClass().getName(); - this.valueSerializerClassName = valueSerializer.getClass().getName(); - this.data = data; - this.numEntries = numEntries; - } - - - @Override - public MemHeapKvState restoreState( - MemoryStateBackend stateBackend, - final TypeSerializer keySerializer, - final TypeSerializer valueSerializer, - V defaultValue, - ClassLoader classLoader) throws Exception { - - // validity checks - if (!keySerializer.getClass().getName().equals(keySerializerClassName) || - !valueSerializer.getClass().getName().equals(valueSerializerClassName)) { - throw new IllegalArgumentException( - "Cannot restore the state from the snapshot with the given serializers. " + - "State (K/V) was serialized with (" + valueSerializerClassName + - "/" + keySerializerClassName + ")"); - } - - // restore state - HashMap stateMap = new HashMap<>(numEntries); - DataInputDeserializer in = new DataInputDeserializer(data, 0, data.length); - - for (int i = 0; i < numEntries; i++) { - K key = keySerializer.deserialize(in); - V value = valueSerializer.deserialize(in); - stateMap.put(key, value); - } - - return new MemHeapKvState(keySerializer, valueSerializer, defaultValue, stateMap); - } - - /** - * Discarding the heap state is a no-op. - */ - @Override - public void discardState() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java deleted file mode 100644 index 05368bd..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state.memory; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.streaming.api.state.StateBackend; -import org.apache.flink.streaming.api.state.StreamStateHandle; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.Serializable; - -/** - * A {@link StateBackend} that stores all its data and checkpoints in memory and has no - * capabilities to spill to disk. Checkpoints are serialized and the serialized data is - * transferred - */ -public class MemoryStateBackend extends StateBackend { - - private static final long serialVersionUID = 4109305377809414635L; - - /** The default maximal size that the snapshotted memory state may have (5 MiBytes) */ - private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024; - - /** The maximal size that the snapshotted memory state may have */ - private final int maxStateSize; - - /** - * Creates a new memory state backend that accepts states whose serialized forms are - * up to the default state size (5 MB). - */ - public MemoryStateBackend() { - this(DEFAULT_MAX_STATE_SIZE); - } - - /** - * Creates a new memory state backend that accepts states whose serialized forms are - * up to the given number of bytes. - * - * @param maxStateSize The maximal size of the serialized state - */ - public MemoryStateBackend(int maxStateSize) { - this.maxStateSize = maxStateSize; - } - - // ------------------------------------------------------------------------ - // initialization and cleanup - // ------------------------------------------------------------------------ - - @Override - public void initializeForJob(JobID job) { - // nothing to do here - } - - @Override - public void disposeAllStateForCurrentJob() { - // nothing to do here, GC will do it - } - - @Override - public void close() throws Exception {} - - // ------------------------------------------------------------------------ - // State backend operations - // ------------------------------------------------------------------------ - - @Override - public MemHeapKvState createKvState( - TypeSerializer keySerializer, TypeSerializer valueSerializer, V defaultValue) { - return new MemHeapKvState(keySerializer, valueSerializer, defaultValue); - } - - /** - * Serialized the given state into bytes using Java serialization and creates a state handle that - * can re-create that state. - * - * @param state The state to checkpoint. - * @param checkpointID The ID of the checkpoint. - * @param timestamp The timestamp of the checkpoint. - * @param The type of the state. - * - * @return A state handle that contains the given state serialized as bytes. - * @throws Exception Thrown, if the serialization fails. - */ - @Override - public StateHandle checkpointStateSerializable( - S state, long checkpointID, long timestamp) throws Exception - { - SerializedStateHandle handle = new SerializedStateHandle<>(state); - checkSize(handle.getSizeOfSerializedState(), maxStateSize); - return new SerializedStateHandle(state); - } - - @Override - public CheckpointStateOutputStream createCheckpointStateOutputStream( - long checkpointID, long timestamp) throws Exception - { - return new MemoryCheckpointOutputStream(maxStateSize); - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)"; - } - - static void checkSize(int size, int maxSize) throws IOException { - if (size > maxSize) { - throw new IOException( - "Size of the state is larger than the maximum permitted memory-backed state. Size=" - + size + " , maxSize=" + maxSize - + " . Consider using a different state backend, like the File System State backend."); - } - } - - // ------------------------------------------------------------------------ - - /** - * A CheckpointStateOutputStream that writes into a byte array. - */ - public static final class MemoryCheckpointOutputStream extends CheckpointStateOutputStream { - - private final ByteArrayOutputStream os = new ByteArrayOutputStream(); - - private final int maxSize; - - private boolean closed; - - public MemoryCheckpointOutputStream(int maxSize) { - this.maxSize = maxSize; - } - - @Override - public void write(int b) { - os.write(b); - } - - @Override - public void write(byte[] b, int off, int len) { - os.write(b, off, len); - } - - // -------------------------------------------------------------------- - - @Override - public void close() { - closed = true; - os.reset(); - } - - @Override - public StreamStateHandle closeAndGetHandle() throws IOException { - return new ByteStreamStateHandle(closeAndGetBytes()); - } - - /** - * Closes the stream and returns the byte array containing the stream's data. - * @return The byte array containing the stream's data. - * @throws IOException Thrown if the size of the data exceeds the maximal - */ - public byte[] closeAndGetBytes() throws IOException { - if (!closed) { - checkSize(os.size(), maxSize); - byte[] bytes = os.toByteArray(); - close(); - return bytes; - } - else { - throw new IllegalStateException("stream has already been closed"); - } - } - } - - // ------------------------------------------------------------------------ - // Static default instance - // ------------------------------------------------------------------------ - - /** The default instance of this state backend, using the default maximal state size */ - private static final MemoryStateBackend DEFAULT_INSTANCE = new MemoryStateBackend(); - - /** - * Gets the default instance of this state backend, using the default maximal state size. - * @return The default instance of this state backend. - */ - public static MemoryStateBackend defaultInstance() { - return DEFAULT_INSTANCE; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java deleted file mode 100644 index 163cadd..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/SerializedStateHandle.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state.memory; - -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.util.SerializedValue; - -import java.io.IOException; - -/** - * A state handle that represents its state in serialized form as bytes. - * - * @param The type of state represented by this state handle. - */ -public class SerializedStateHandle extends SerializedValue implements StateHandle { - - private static final long serialVersionUID = 4145685722538475769L; - - public SerializedStateHandle(T value) throws IOException { - super(value); - } - - @Override - public T getState(ClassLoader classLoader) throws Exception { - return deserializeValue(classLoader); - } - - /** - * Discarding heap-memory backed state is a no-op, so this method does nothing. - */ - @Override - public void discardState() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java index cf8575e..9964760 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.util.MathUtils; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; -import org.apache.flink.streaming.api.state.StateBackend; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.Triggerable; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 72a8c25..8c58e29 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -30,17 +30,16 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; -import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.api.state.StateBackend; -import org.apache.flink.streaming.api.state.StateBackendFactory; -import org.apache.flink.streaming.api.state.filesystem.FsStateBackend; -import org.apache.flink.streaming.api.state.filesystem.FsStateBackendFactory; -import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateBackendFactory; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -493,55 +492,52 @@ public abstract class StreamTask> private StateBackend createStateBackend() throws Exception { StateBackend configuredBackend = configuration.getStateBackend(userClassLoader); - + if (configuredBackend != null) { // backend has been configured on the environment LOG.info("Using user-defined state backend: " + configuredBackend); return configuredBackend; - } - else { + } else { // see if we have a backend specified in the configuration Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration(); String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null); - + if (backendName == null) { LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)"); backendName = "jobmanager"; } - + backendName = backendName.toLowerCase(); switch (backendName) { case "jobmanager": LOG.info("State backend is set to heap memory (checkpoint to jobmanager)"); return MemoryStateBackend.defaultInstance(); - + case "filesystem": FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig); LOG.info("State backend is set to heap memory (checkpoints to filesystem \"" - + backend.getBasePath() + "\")"); + + backend.getBasePath() + "\")"); return backend; - + default: try { @SuppressWarnings("rawtypes") Class clazz = - Class.forName(backendName, false, userClassLoader).asSubclass(StateBackendFactory.class); + Class.forName(backendName, false, userClassLoader).asSubclass(StateBackendFactory.class); return (StateBackend) clazz.newInstance(); - } - catch (ClassNotFoundException e) { + } catch (ClassNotFoundException e) { throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName); - } - catch (ClassCastException e) { + } catch (ClassCastException e) { throw new IllegalConfigurationException("The class configured under '" + - ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" + - backendName + ')'); - } - catch (Throwable t) { + ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" + + backendName + ')'); + } catch (Throwable t) { throw new IllegalConfigurationException("Cannot create configured state backend", t); } } } + } /** * Registers a timer. http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java index 334fd44..afeabd9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.streaming.api.state.KvStateSnapshot; +import org.apache.flink.runtime.state.KvStateSnapshot; import java.io.Serializable; import java.util.ConcurrentModificationException; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java deleted file mode 100644 index 73100d1..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java +++ /dev/null @@ -1,419 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state; - -import org.apache.commons.io.FileUtils; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.FloatSerializer; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.typeutils.runtime.ValueSerializer; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle; -import org.apache.flink.streaming.api.state.filesystem.FsStateBackend; -import org.apache.flink.types.StringValue; -import org.apache.flink.util.OperatingSystem; - -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.util.Random; -import java.util.UUID; - -import static org.junit.Assert.*; - -public class FileStateBackendTest { - - @Test - public void testSetupAndSerialization() { - File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); - try { - final String backendDir = localFileUri(tempDir); - FsStateBackend originalBackend = new FsStateBackend(backendDir); - - assertFalse(originalBackend.isInitialized()); - assertEquals(new URI(backendDir), originalBackend.getBasePath().toUri()); - assertNull(originalBackend.getCheckpointDirectory()); - - // serialize / copy the backend - FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend); - assertFalse(backend.isInitialized()); - assertEquals(new URI(backendDir), backend.getBasePath().toUri()); - assertNull(backend.getCheckpointDirectory()); - - // no file operations should be possible right now - try { - backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis()); - fail("should fail with an exception"); - } catch (IllegalStateException e) { - // supreme! - } - - backend.initializeForJob(new JobID()); - assertNotNull(backend.getCheckpointDirectory()); - - File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); - assertTrue(checkpointDir.exists()); - assertTrue(isDirectoryEmpty(checkpointDir)); - - backend.disposeAllStateForCurrentJob(); - assertNull(backend.getCheckpointDirectory()); - - assertTrue(isDirectoryEmpty(tempDir)); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - deleteDirectorySilently(tempDir); - } - } - - @Test - public void testSerializableState() { - File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); - try { - FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); - backend.initializeForJob(new JobID()); - - File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); - - String state1 = "dummy state"; - String state2 = "row row row your boat"; - Integer state3 = 42; - - StateHandle handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis()); - StateHandle handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis()); - StateHandle handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis()); - - assertFalse(isDirectoryEmpty(checkpointDir)); - assertEquals(state1, handle1.getState(getClass().getClassLoader())); - handle1.discardState(); - - assertFalse(isDirectoryEmpty(checkpointDir)); - assertEquals(state2, handle2.getState(getClass().getClassLoader())); - handle2.discardState(); - - assertFalse(isDirectoryEmpty(checkpointDir)); - assertEquals(state3, handle3.getState(getClass().getClassLoader())); - handle3.discardState(); - - assertTrue(isDirectoryEmpty(checkpointDir)); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - deleteDirectorySilently(tempDir); - } - } - - @Test - public void testStateOutputStream() { - File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); - try { - FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); - backend.initializeForJob(new JobID()); - - File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); - - byte[] state1 = new byte[1274673]; - byte[] state2 = new byte[1]; - byte[] state3 = new byte[0]; - byte[] state4 = new byte[177]; - - Random rnd = new Random(); - rnd.nextBytes(state1); - rnd.nextBytes(state2); - rnd.nextBytes(state3); - rnd.nextBytes(state4); - - long checkpointId = 97231523452L; - - FsStateBackend.FsCheckpointStateOutputStream stream1 = - backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); - FsStateBackend.FsCheckpointStateOutputStream stream2 = - backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); - FsStateBackend.FsCheckpointStateOutputStream stream3 = - backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); - - stream1.write(state1); - stream2.write(state2); - stream3.write(state3); - - FileStreamStateHandle handle1 = stream1.closeAndGetHandle(); - FileStreamStateHandle handle2 = stream2.closeAndGetHandle(); - FileStreamStateHandle handle3 = stream3.closeAndGetHandle(); - - // use with try-with-resources - StreamStateHandle handle4; - try (StateBackend.CheckpointStateOutputStream stream4 = - backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) { - stream4.write(state4); - handle4 = stream4.closeAndGetHandle(); - } - - // close before accessing handle - StateBackend.CheckpointStateOutputStream stream5 = - backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); - stream5.write(state4); - stream5.close(); - try { - stream5.closeAndGetHandle(); - fail(); - } catch (IOException e) { - // uh-huh - } - - validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1); - handle1.discardState(); - assertFalse(isDirectoryEmpty(checkpointDir)); - ensureLocalFileDeleted(handle1.getFilePath()); - - validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2); - handle2.discardState(); - assertFalse(isDirectoryEmpty(checkpointDir)); - ensureLocalFileDeleted(handle2.getFilePath()); - - validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3); - handle3.discardState(); - assertFalse(isDirectoryEmpty(checkpointDir)); - ensureLocalFileDeleted(handle3.getFilePath()); - - validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4); - handle4.discardState(); - assertTrue(isDirectoryEmpty(checkpointDir)); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - deleteDirectorySilently(tempDir); - } - } - - @Test - public void testKeyValueState() { - File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); - try { - FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); - backend.initializeForJob(new JobID()); - - File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); - - KvState kv = - backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); - - assertEquals(0, kv.size()); - - // some modifications to the state - kv.setCurrentKey(1); - assertNull(kv.value()); - kv.update("1"); - assertEquals(1, kv.size()); - kv.setCurrentKey(2); - assertNull(kv.value()); - kv.update("2"); - assertEquals(2, kv.size()); - kv.setCurrentKey(1); - assertEquals("1", kv.value()); - assertEquals(2, kv.size()); - - // draw a snapshot - KvStateSnapshot snapshot1 = - kv.shapshot(682375462378L, System.currentTimeMillis()); - - // make some more modifications - kv.setCurrentKey(1); - kv.update("u1"); - kv.setCurrentKey(2); - kv.update("u2"); - kv.setCurrentKey(3); - kv.update("u3"); - - // draw another snapshot - KvStateSnapshot snapshot2 = - kv.shapshot(682375462379L, System.currentTimeMillis()); - - // validate the original state - assertEquals(3, kv.size()); - kv.setCurrentKey(1); - assertEquals("u1", kv.value()); - kv.setCurrentKey(2); - assertEquals("u2", kv.value()); - kv.setCurrentKey(3); - assertEquals("u3", kv.value()); - - // restore the first snapshot and validate it - KvState restored1 = snapshot1.restoreState(backend, - IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader()); - - assertEquals(2, restored1.size()); - restored1.setCurrentKey(1); - assertEquals("1", restored1.value()); - restored1.setCurrentKey(2); - assertEquals("2", restored1.value()); - - // restore the first snapshot and validate it - KvState restored2 = snapshot2.restoreState(backend, - IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader()); - - assertEquals(3, restored2.size()); - restored2.setCurrentKey(1); - assertEquals("u1", restored2.value()); - restored2.setCurrentKey(2); - assertEquals("u2", restored2.value()); - restored2.setCurrentKey(3); - assertEquals("u3", restored2.value()); - - snapshot1.discardState(); - assertFalse(isDirectoryEmpty(checkpointDir)); - - snapshot2.discardState(); - assertTrue(isDirectoryEmpty(checkpointDir)); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - deleteDirectorySilently(tempDir); - } - } - - @Test - public void testRestoreWithWrongSerializers() { - File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); - try { - FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); - backend.initializeForJob(new JobID()); - - File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); - - KvState kv = - backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); - - kv.setCurrentKey(1); - kv.update("1"); - kv.setCurrentKey(2); - kv.update("2"); - - KvStateSnapshot snapshot = - kv.shapshot(682375462378L, System.currentTimeMillis()); - - - @SuppressWarnings("unchecked") - TypeSerializer fakeIntSerializer = - (TypeSerializer) (TypeSerializer) FloatSerializer.INSTANCE; - - @SuppressWarnings("unchecked") - TypeSerializer fakeStringSerializer = - (TypeSerializer) (TypeSerializer) new ValueSerializer(StringValue.class); - - try { - snapshot.restoreState(backend, fakeIntSerializer, - StringSerializer.INSTANCE, null, getClass().getClassLoader()); - fail("should recognize wrong serializers"); - } catch (IllegalArgumentException e) { - // expected - } catch (Exception e) { - fail("wrong exception"); - } - - try { - snapshot.restoreState(backend, IntSerializer.INSTANCE, - fakeStringSerializer, null, getClass().getClassLoader()); - fail("should recognize wrong serializers"); - } catch (IllegalArgumentException e) { - // expected - } catch (Exception e) { - fail("wrong exception"); - } - - try { - snapshot.restoreState(backend, fakeIntSerializer, - fakeStringSerializer, null, getClass().getClassLoader()); - fail("should recognize wrong serializers"); - } catch (IllegalArgumentException e) { - // expected - } catch (Exception e) { - fail("wrong exception"); - } - - snapshot.discardState(); - - assertTrue(isDirectoryEmpty(checkpointDir)); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - deleteDirectorySilently(tempDir); - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private static void ensureLocalFileDeleted(Path path) { - URI uri = path.toUri(); - if ("file".equals(uri.getScheme())) { - File file = new File(uri.getPath()); - assertFalse("file not properly deleted", file.exists()); - } - else { - throw new IllegalArgumentException("not a local path"); - } - } - - private static void deleteDirectorySilently(File dir) { - try { - FileUtils.deleteDirectory(dir); - } - catch (IOException ignored) {} - } - - private static boolean isDirectoryEmpty(File directory) { - String[] nested = directory.list(); - return nested == null || nested.length == 0; - } - - private static String localFileUri(File path) { - return (OperatingSystem.isWindows() ? "file:/" : "file://") + path.getAbsolutePath(); - } - - private static void validateBytesInStream(InputStream is, byte[] data) throws IOException { - byte[] holder = new byte[data.length]; - assertEquals("not enough data", holder.length, is.read(holder)); - assertEquals("too much data", -1, is.read()); - assertArrayEquals("wrong data", data, holder); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java deleted file mode 100644 index 3410d09..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.state; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.FloatSerializer; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.typeutils.runtime.ValueSerializer; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; -import org.apache.flink.types.StringValue; -import org.junit.Test; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.HashMap; - -import static org.junit.Assert.*; - -/** - * Tests for the {@link org.apache.flink.streaming.api.state.memory.MemoryStateBackend}. - */ -public class MemoryStateBackendTest { - - @Test - public void testSerializableState() { - try { - MemoryStateBackend backend = new MemoryStateBackend(); - - HashMap state = new HashMap<>(); - state.put("hey there", 2); - state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77); - - StateHandle> handle = backend.checkpointStateSerializable(state, 12, 459); - assertNotNull(handle); - - HashMap restored = handle.getState(getClass().getClassLoader()); - assertEquals(state, restored); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testOversizedState() { - try { - MemoryStateBackend backend = new MemoryStateBackend(10); - - HashMap state = new HashMap<>(); - state.put("hey there", 2); - state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77); - - try { - backend.checkpointStateSerializable(state, 12, 459); - fail("this should cause an exception"); - } - catch (IOException e) { - // now darling, isn't that exactly what we wanted? - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testStateStream() { - try { - MemoryStateBackend backend = new MemoryStateBackend(); - - HashMap state = new HashMap<>(); - state.put("hey there", 2); - state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77); - - StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2); - ObjectOutputStream oos = new ObjectOutputStream(os); - oos.writeObject(state); - oos.flush(); - StreamStateHandle handle = os.closeAndGetHandle(); - - assertNotNull(handle); - - ObjectInputStream ois = new ObjectInputStream(handle.getState(getClass().getClassLoader())); - assertEquals(state, ois.readObject()); - assertTrue(ois.available() <= 0); - ois.close(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testOversizedStateStream() { - try { - MemoryStateBackend backend = new MemoryStateBackend(10); - - HashMap state = new HashMap<>(); - state.put("hey there", 2); - state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77); - - StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2); - ObjectOutputStream oos = new ObjectOutputStream(os); - - try { - oos.writeObject(state); - oos.flush(); - os.closeAndGetHandle(); - fail("this should cause an exception"); - } - catch (IOException e) { - // oh boy! what an exception! - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testKeyValueState() { - try { - MemoryStateBackend backend = new MemoryStateBackend(); - - KvState kv = - backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); - - assertEquals(0, kv.size()); - - // some modifications to the state - kv.setCurrentKey(1); - assertNull(kv.value()); - kv.update("1"); - assertEquals(1, kv.size()); - kv.setCurrentKey(2); - assertNull(kv.value()); - kv.update("2"); - assertEquals(2, kv.size()); - kv.setCurrentKey(1); - assertEquals("1", kv.value()); - assertEquals(2, kv.size()); - - // draw a snapshot - KvStateSnapshot snapshot1 = - kv.shapshot(682375462378L, System.currentTimeMillis()); - - // make some more modifications - kv.setCurrentKey(1); - kv.update("u1"); - kv.setCurrentKey(2); - kv.update("u2"); - kv.setCurrentKey(3); - kv.update("u3"); - - // draw another snapshot - KvStateSnapshot snapshot2 = - kv.shapshot(682375462379L, System.currentTimeMillis()); - - // validate the original state - assertEquals(3, kv.size()); - kv.setCurrentKey(1); - assertEquals("u1", kv.value()); - kv.setCurrentKey(2); - assertEquals("u2", kv.value()); - kv.setCurrentKey(3); - assertEquals("u3", kv.value()); - - // restore the first snapshot and validate it - KvState restored1 = snapshot1.restoreState(backend, - IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader()); - - assertEquals(2, restored1.size()); - restored1.setCurrentKey(1); - assertEquals("1", restored1.value()); - restored1.setCurrentKey(2); - assertEquals("2", restored1.value()); - - // restore the first snapshot and validate it - KvState restored2 = snapshot2.restoreState(backend, - IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader()); - - assertEquals(3, restored2.size()); - restored2.setCurrentKey(1); - assertEquals("u1", restored2.value()); - restored2.setCurrentKey(2); - assertEquals("u2", restored2.value()); - restored2.setCurrentKey(3); - assertEquals("u3", restored2.value()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testRestoreWithWrongSerializers() { - try { - MemoryStateBackend backend = new MemoryStateBackend(); - KvState kv = - backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null); - - kv.setCurrentKey(1); - kv.update("1"); - kv.setCurrentKey(2); - kv.update("2"); - - KvStateSnapshot snapshot = - kv.shapshot(682375462378L, System.currentTimeMillis()); - - - @SuppressWarnings("unchecked") - TypeSerializer fakeIntSerializer = - (TypeSerializer) (TypeSerializer) FloatSerializer.INSTANCE; - - @SuppressWarnings("unchecked") - TypeSerializer fakeStringSerializer = - (TypeSerializer) (TypeSerializer) new ValueSerializer(StringValue.class); - - try { - snapshot.restoreState(backend, fakeIntSerializer, - StringSerializer.INSTANCE, null, getClass().getClassLoader()); - fail("should recognize wrong serializers"); - } catch (IllegalArgumentException e) { - // expected - } catch (Exception e) { - fail("wrong exception"); - } - - try { - snapshot.restoreState(backend, IntSerializer.INSTANCE, - fakeStringSerializer, null, getClass().getClassLoader()); - fail("should recognize wrong serializers"); - } catch (IllegalArgumentException e) { - // expected - } catch (Exception e) { - fail("wrong exception"); - } - - try { - snapshot.restoreState(backend, fakeIntSerializer, - fakeStringSerializer, null, getClass().getClassLoader()); - fail("should recognize wrong serializers"); - } catch (IllegalArgumentException e) { - // expected - } catch (Exception e) { - fail("wrong exception"); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index dd76a67..ad3c838 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -28,8 +28,8 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.state.StateBackend; -import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index ab8e551..4bd260f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -28,8 +28,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.state.StateBackend; -import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java index 81d3a69..0c708c6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -36,8 +36,8 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.state.StateBackend; -import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index b83feca..01f95bc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -30,8 +30,8 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.state.StateBackend; -import org.apache.flink.streaming.api.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask;