flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StephanEwen <...@git.apache.org>
Subject [GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
Date Wed, 10 Jan 2018 18:29:12 GMT
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5248#discussion_r160760708
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.filesystem;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.configuration.CheckpointingOptions;
    +import org.apache.flink.configuration.ConfigOption;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +/**
    + * A base class for all state backends that store their metadata (and data) in files.
    + * Examples that inherit from this are the {@link FsStateBackend}, the
    + * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend MemoryStateBackend},
or the
    + * {@code RocksDBStateBackend}.
    + *
    + * <p>This class takes the base checkpoint- and savepoint directory paths, but
also accepts null
    + * for both of then, in which case creating externalized checkpoint is not possible,
and it is not
    + * possible to create a savepoint with a default path. Null is accepted to enable implementations
    + * that only optionally support default savepoints and externalized checkpoints.
    + *
    + * <h1>Checkpoint Layout</h1>
    + *
    + * <p>The state backend is configured with a base directory and persists the checkpoint
data of specific
    + * checkpoints in specific subdirectories. For example, if the base directory was set
to
    + * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory
with
    + * the job's ID that will contain the actual checkpoints:
    + * ({@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
    + *
    + * <p>Each checkpoint individually will store all its files in a subdirectory that
includes the
    + * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
    + *
    + * <h1>Savepoint Layout</h1>
    + *
    + * <p>A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/},
will create
    + * a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all savepoint
data.
    + * The random digits are added as "entropy" to avoid directory collisions.
    + *
    + * <h1>Metadata and Success Files</h1>
    + *
    + * <p>A completed checkpoint writes its metadata into a file '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'.
    + * After that is complete (i.e., the file complete), it writes an additional file
    + * '{@value AbstractFsCheckpointStorage#SUCCESS_FILE_NAME}'.
    + *
    + * <p>Ideally that would not be necessary, and one would write the metadata file
to a temporary file and
    + * then issue a atomic (or at least constant time) rename. But some of the file systems
(like S3) do
    + * not support that: A rename is a copy process which, when failing, leaves corrupt half
written
    + * files/objects. The success file is hence needed as a signal that the
    + * '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'file is complete.
    + */
    +@PublicEvolving
    +public abstract class AbstractFileStateBackend extends AbstractStateBackend {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	// ------------------------------------------------------------------------
    +	//  State Backend Properties
    +	// ------------------------------------------------------------------------
    +
    +	/** The path where checkpoints will be stored, or null, if none has been configured.
*/
    +	@Nullable
    +	private final Path baseCheckpointPath;
    +
    +	/** The path where savepoints will be stored, or null, if none has been configured.
*/
    +	@Nullable
    +	private final Path baseSavepointPath;
    +
    +	/**
    +	 * Creates a backend with the given optional checkpoint- and savepoint base directories.
    +	 *
    +	 * @param baseCheckpointPath The base directory for checkpoints, or null, if none is
configured.
    +	 * @param baseSavepointPath The default directory for savepoints, or null, if none is
set.
    +	 */
    +	protected AbstractFileStateBackend(
    +			@Nullable URI baseCheckpointPath,
    +			@Nullable URI baseSavepointPath) {
    +
    +		this(baseCheckpointPath == null ? null : new Path(baseCheckpointPath),
    +				baseSavepointPath == null ? null : new Path(baseSavepointPath));
    +	}
    +
    +	/**
    +	 * Creates a backend with the given optional checkpoint- and savepoint base directories.
    +	 *
    +	 * @param baseCheckpointPath The base directory for checkpoints, or null, if none is
configured.
    +	 * @param baseSavepointPath The default directory for savepoints, or null, if none is
set.
    +	 */
    +	protected AbstractFileStateBackend(
    +			@Nullable Path baseCheckpointPath,
    +			@Nullable Path baseSavepointPath) {
    +
    +		this.baseCheckpointPath = baseCheckpointPath == null ? null : validatePath(baseCheckpointPath);
    +		this.baseSavepointPath = baseSavepointPath == null ? null : validatePath(baseSavepointPath);
    +	}
    +
    +	/**
    +	 * Creates a new backend using the given checkpoint-/savepoint directories, or the values
defined in
    +	 * the given configuration. If a checkpoint-/savepoint parameter is not null, that value
takes precedence
    +	 * over the value in the configuration. If the configuration does not specify a value,
it is possible
    +	 * that the checkpoint-/savepoint directories in the backend will be null.
    +	 *
    +	 * <p>This constructor can be used to create a backend that is based partially
on a given backend
    +	 * and partially on a configuration.
    +	 *
    +	 * @param baseCheckpointPath The checkpoint base directory to use (or null).
    +	 * @param baseSavepointPath The default savepoint directory to use (or null).
    +	 * @param configuration The configuration to read values from.
    +	 */
    +	protected AbstractFileStateBackend(
    +			@Nullable Path baseCheckpointPath,
    +			@Nullable Path baseSavepointPath,
    +			Configuration configuration) {
    +
    +		this(parameterOrConfigured(baseCheckpointPath, configuration, CheckpointingOptions.CHECKPOINTS_DIRECTORY),
    +				parameterOrConfigured(baseSavepointPath, configuration, CheckpointingOptions.SAVEPOINT_DIRECTORY));
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the checkpoint base directory. Jobs will create job-specific subdirectories
    +	 * for checkpoints within this directory. May be null, if not configured.
    +	 *
    +	 * @return The checkpoint base directory
    +	 */
    +	@Nullable
    +	public Path getCheckpointPath() {
    --- End diff --
    
    Agreed, if we interpret the `RocksDBStateBacked  class as the "factory for keyed state
in RocksDB with checkpoints to files", then we can make this an `AbstractFileStateBackend`.


---

Mime
View raw message