flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [10/17] flink git commit: [FLINK-5823] [checkpoints] Make RocksDB state backend configurable
Date Thu, 18 Jan 2018 17:09:27 GMT
[FLINK-5823] [checkpoints] Make RocksDB state backend configurable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d19525e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d19525e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d19525e9

Branch: refs/heads/master
Commit: d19525e90e69ddd257d47371b8ea0319fa4673c8
Parents: 1931993
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Oct 26 15:43:23 2017 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jan 18 18:08:03 2018 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    | 159 +++++++++++++++----
 .../state/RocksDBStateBackendFactory.java       |  31 +---
 .../state/RocksDBAsyncSnapshotTest.java         |   7 +
 .../state/RocksDBStateBackendConfigTest.java    |  61 +++++--
 .../state/RocksDBStateBackendFactoryTest.java   | 139 +++++++++++++++-
 .../state/RocksDBStateBackendTest.java          |   4 +-
 6 files changed, 327 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d19525e9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index a6552bc..6bcd595 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -20,15 +20,20 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.util.AbstractID;
 
@@ -67,7 +72,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * using the methods {@link #setPredefinedOptions(PredefinedOptions)} and
  * {@link #setOptions(OptionsFactory)}.
  */
-public class RocksDBStateBackend extends AbstractStateBackend {
+public class RocksDBStateBackend extends AbstractStateBackend implements ConfigurableStateBackend
{
 
 	private static final long serialVersionUID = 1L;
 
@@ -83,9 +88,11 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	// -- configuration values, set in the application / configuration
 
 	/** The state backend that we use for creating checkpoint streams. */
-	private final AbstractStateBackend checkpointStreamBackend;
+	private final StateBackend checkpointStreamBackend;
 
-	/** Base paths for RocksDB directory, as configured. May be null. */
+	/** Base paths for RocksDB directory, as configured.
+	 * Null if not yet set, in which case the configuration values will be used.
+	 * The configuration defaults to the TaskManager's temp directories. */
 	@Nullable
 	private Path[] localRocksDbDirectories;
 
@@ -96,8 +103,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	@Nullable
 	private OptionsFactory optionsFactory;
 
-	/** True if incremental checkpointing is enabled. */
-	private boolean enableIncrementalCheckpointing;
+	/** True if incremental checkpointing is enabled.
+	 * Null if not yet set, in which case the configuration values will be used. */
+	@Nullable
+	private Boolean enableIncrementalCheckpointing;
 
 	// -- runtime values, set on TaskManager when initializing / using the backend
 
@@ -107,8 +116,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	/** JobID for uniquifying backup paths. */
 	private transient JobID jobId;
 
+	/** The index of the next directory to be used from {@link #initializedDbBasePaths}.*/
 	private transient int nextDirectory;
-	
+
 	/** Whether we already lazily initialized our local storage directories. */
 	private transient boolean isInitialized;
 
@@ -127,7 +137,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
 	public RocksDBStateBackend(String checkpointDataUri) throws IOException {
-		this(new Path(checkpointDataUri).toUri(), false);
+		this(new Path(checkpointDataUri).toUri());
 	}
 
 	/**
@@ -160,7 +170,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
 	public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
-		this(new FsStateBackend(checkpointDataUri), false);
+		this(new FsStateBackend(checkpointDataUri));
 	}
 
 	/**
@@ -188,7 +198,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * <p>The snapshots of the RocksDB state will be stored using the given backend's
 	 * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
 	 *
-	 * @param checkpointStreamBackend The backend to store the
+	 * @param checkpointStreamBackend The backend write the checkpoint streams to.
 	 */
 	public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
 		this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
@@ -202,18 +212,92 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * <p>The snapshots of the RocksDB state will be stored using the given backend's
 	 * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
 	 *
-	 * @param checkpointStreamBackend The backend to store the
-	 * @param enableIncrementalCheckpointing True if incremental checkponting is enabled
+	 * @param checkpointStreamBackend The backend write the checkpoint streams to.
+	 * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled.
 	 */
 	public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing)
{
 		this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
 		this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
 	}
 
+	/**
+	 * Private constructor that creates a re-configured copy of the state backend.
+	 *
+	 * @param original The state backend to re-configure.
+	 * @param config The configuration.
+	 */
+	private RocksDBStateBackend(RocksDBStateBackend original, Configuration config) {
+		// reconfigure the state backend backing the streams
+		final StateBackend originalStreamBackend = original.checkpointStreamBackend;
+		this.checkpointStreamBackend = originalStreamBackend instanceof ConfigurableStateBackend
?
+				((ConfigurableStateBackend) originalStreamBackend).configure(config) :
+				originalStreamBackend;
+
+		// configure incremental checkpoints
+		if (original.enableIncrementalCheckpointing != null) {
+			this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing;
+		}
+		else {
+			this.enableIncrementalCheckpointing =
+					config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS);
+		}
+
+		// configure local directories
+		if (original.localRocksDbDirectories != null) {
+			this.localRocksDbDirectories = original.localRocksDbDirectories;
+		}
+		else {
+			final String rocksdbLocalPaths = config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES);
+			if (rocksdbLocalPaths != null) {
+				String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);
+
+				try {
+					setDbStoragePaths(directories);
+				}
+				catch (IllegalArgumentException e) {
+					throw new IllegalConfigurationException("Invalid configuration for RocksDB state " +
+							"backend's local storage directories: " + e.getMessage(), e);
+				}
+			}
+		}
+
+		// copy remaining settings
+		this.predefinedOptions = original.predefinedOptions;
+		this.optionsFactory = original.optionsFactory;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Reconfiguration
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a copy of this state backend that uses the values defined in the configuration
+	 * for fields where that were not yet specified in this state backend.
+	 *
+	 * @param config the configuration
+	 * @return The re-configured variant of the state backend
+	 */
+	@Override
+	public RocksDBStateBackend configure(Configuration config) {
+		return new RocksDBStateBackend(this, config);
+	}
+
 	// ------------------------------------------------------------------------
 	//  State backend methods
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Gets the state backend that this RocksDB state backend uses to persist
+	 * its bytes to.
+	 *
+	 * <p>This RocksDB state backend only implements the RocksDB specific parts, it
+	 * relies on the 'CheckpointBackend' to persist the checkpoint and savepoint bytes
+	 * streams.
+	 */
+	public StateBackend getCheckpointBackend() {
+		return checkpointStreamBackend;
+	}
+
 	private void lazyInitializeForJob(
 			Environment env,
 			String operatorIdentifier) throws IOException {
@@ -314,7 +398,20 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 				numberOfKeyGroups,
 				keyGroupRange,
 				env.getExecutionConfig(),
-				enableIncrementalCheckpointing);
+				isIncrementalCheckpointsEnabled());
+	}
+
+	@Override
+	public OperatorStateBackend createOperatorStateBackend(
+			Environment env,
+			String operatorIdentifier) throws Exception {
+
+		//the default for RocksDB; eventually there can be a operator state backend based on RocksDB,
too.
+		final boolean asyncSnapshots = true;
+		return new DefaultOperatorStateBackend(
+				env.getUserClassLoader(),
+				env.getExecutionConfig(),
+				asyncSnapshots);
 	}
 
 	// ------------------------------------------------------------------------
@@ -390,6 +487,18 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		}
 	}
 
+	/**
+	 * Gets whether incremental checkpoints are enabled for this state backend.
+	 */
+	public boolean isIncrementalCheckpointsEnabled() {
+		if (enableIncrementalCheckpointing != null) {
+			return enableIncrementalCheckpointing;
+		}
+		else {
+			return CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Parametrize with RocksDB Options
 	// ------------------------------------------------------------------------
@@ -480,27 +589,17 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		return opt;
 	}
 
-	@Override
-	public OperatorStateBackend createOperatorStateBackend(
-		Environment env,
-		String operatorIdentifier) throws Exception {
-
-		//the default for RocksDB; eventually there can be a operator state backend based on RocksDB,
too.
-		final boolean asyncSnapshots = true;
-		return new DefaultOperatorStateBackend(
-			env.getUserClassLoader(),
-			env.getExecutionConfig(),
-			asyncSnapshots);
-	}
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
 
 	@Override
 	public String toString() {
-		return "RocksDB State Backend {" +
-			"isInitialized=" + isInitialized +
-			", configuredDbBasePaths=" + Arrays.toString(localRocksDbDirectories) +
-			", initializedDbBasePaths=" + Arrays.toString(initializedDbBasePaths) +
-			", checkpointStreamBackend=" + checkpointStreamBackend +
-			'}';
+		return "RocksDBStateBackend{" +
+				"checkpointStreamBackend=" + checkpointStreamBackend +
+				", localRocksDbDirectories=" + Arrays.toString(localRocksDbDirectories) +
+				", enableIncrementalCheckpointing=" + enableIncrementalCheckpointing +
+				'}';
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d19525e9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
index de5be9a..94e15fa 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
@@ -21,13 +21,8 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateBackendFactory;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
 import java.io.IOException;
 
 /**
@@ -36,39 +31,19 @@ import java.io.IOException;
  */
 public class RocksDBStateBackendFactory implements StateBackendFactory<RocksDBStateBackend>
{
 
-	protected static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackendFactory.class);
-
 	@Override
 	public RocksDBStateBackend createFromConfig(Configuration config)
 			throws IllegalConfigurationException, IOException {
 
+		// we need to explicitly read the checkpoint directory here, because that
+		// is a required constructor parameter
 		final String checkpointDirURI = config.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
-		final String rocksdbLocalPaths = config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES);
-		final boolean incrementalCheckpoints = config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS);
-
 		if (checkpointDirURI == null) {
 			throw new IllegalConfigurationException(
 				"Cannot create the RocksDB state backend: The configuration does not specify the " +
 				"checkpoint directory '" + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() + '\'');
 		}
 
-		try {
-			final Path path = new Path(checkpointDirURI);
-			final RocksDBStateBackend backend = new RocksDBStateBackend(path.toUri(), incrementalCheckpoints);
-
-			if (rocksdbLocalPaths != null) {
-				String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);
-				backend.setDbStoragePaths(directories);
-			}
-
-			LOG.info("State backend is set to RocksDB (configured DB storage paths {}, checkpoints
to filesystem {} ) ",
-					backend.getDbStoragePaths(), path);
-
-			return backend;
-		}
-		catch (IllegalArgumentException e) {
-			throw new IllegalConfigurationException(
-					"Cannot initialize RocksDB State Backend with URI '" + checkpointDirURI + '.', e);
-		}
+		return new RocksDBStateBackend(checkpointDirURI).configure(config);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d19525e9/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 8dc504a..461c526 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -416,6 +417,12 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 		public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier)
throws IOException {
 			return blockerCheckpointStreamFactory;
 		}
+
+		@Override
+		public BlockingStreamMemoryStateBackend configure(Configuration config) {
+			// retain this instance, no re-configuration!
+			return this;
+		}
 	}
 
 	private static class AsyncCheckpointOperator

http://git-wip-us.apache.org/repos/asf/flink/blob/d19525e9/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 853d80f..eace5ff 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
@@ -30,11 +31,11 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.util.IOUtils;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -49,6 +50,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -60,14 +62,12 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-
 /**
  * Tests for configuring the RocksDB State Backend.
  */
 @SuppressWarnings("serial")
 public class RocksDBStateBackendConfigTest {
 
-
 	@Rule
 	public TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -76,6 +76,14 @@ public class RocksDBStateBackendConfigTest {
 	// ------------------------------------------------------------------------
 
 	@Test
+	public void testDefaultsInSync() throws Exception {
+		final boolean defaultIncremental = CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
+
+		RocksDBStateBackend backend = new RocksDBStateBackend(tempFolder.newFolder().toURI());
+		assertEquals(defaultIncremental, backend.isIncrementalCheckpointsEnabled());
+	}
+
+	@Test
 	public void testSetDbPath() throws Exception {
 		String checkpointPath = tempFolder.newFolder().toURI().toString();
 		File testDir1 = tempFolder.newFolder();
@@ -139,8 +147,6 @@ public class RocksDBStateBackendConfigTest {
 	/**
 	 * This tests whether the RocksDB backends uses the temp directories that are provided
 	 * from the {@link Environment} when no db storage path is set.
-	 *
-	 * @throws Exception
 	 */
 	@Test
 	public void testUseTempDirectories() throws Exception {
@@ -214,7 +220,6 @@ public class RocksDBStateBackendConfigTest {
 		finally {
 			//noinspection ResultOfMethodCallIgnored
 			targetDir.setWritable(true, false);
-			FileUtils.deleteDirectory(targetDir);
 		}
 	}
 
@@ -256,8 +261,6 @@ public class RocksDBStateBackendConfigTest {
 		} finally {
 			//noinspection ResultOfMethodCallIgnored
 			targetDir1.setWritable(true, false);
-			FileUtils.deleteDirectory(targetDir1);
-			FileUtils.deleteDirectory(targetDir2);
 		}
 	}
 
@@ -329,16 +332,48 @@ public class RocksDBStateBackendConfigTest {
 	@Test
 	public void testPredefinedOptionsEnum() {
 		for (PredefinedOptions o : PredefinedOptions.values()) {
-			DBOptions opt = o.createDBOptions();
-			try {
+			try (DBOptions opt = o.createDBOptions()) {
 				assertNotNull(opt);
-			} finally {
-				opt.dispose();
 			}
 		}
 	}
 
 	// ------------------------------------------------------------------------
+	//  Reconfiguration
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testRocksDbReconfigurationCopiesExistingValues() throws Exception {
+		final FsStateBackend checkpointBackend = new FsStateBackend(tempFolder.newFolder().toURI().toString());
+		final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
+
+		final RocksDBStateBackend original = new RocksDBStateBackend(checkpointBackend, incremental);
+
+		// these must not be the default options
+		final PredefinedOptions predOptions = PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM;
+		assertNotEquals(predOptions, original.getPredefinedOptions());
+		original.setPredefinedOptions(predOptions);
+
+		final OptionsFactory optionsFactory = mock(OptionsFactory.class);
+		original.setOptions(optionsFactory);
+
+		final String[] localDirs = new String[] {
+				tempFolder.newFolder().getAbsolutePath(), tempFolder.newFolder().getAbsolutePath() };
+		original.setDbStoragePaths(localDirs);
+
+		RocksDBStateBackend copy = original.configure(new Configuration());
+
+		assertEquals(original.isIncrementalCheckpointsEnabled(), copy.isIncrementalCheckpointsEnabled());
+		assertArrayEquals(original.getDbStoragePaths(), copy.getDbStoragePaths());
+		assertEquals(original.getOptions(), copy.getOptions());
+		assertEquals(original.getPredefinedOptions(), copy.getPredefinedOptions());
+
+		FsStateBackend copyCheckpointBackend = (FsStateBackend) copy.getCheckpointBackend();
+		assertEquals(checkpointBackend.getCheckpointPath(), copyCheckpointBackend.getCheckpointPath());
+		assertEquals(checkpointBackend.getSavepointPath(), copyCheckpointBackend.getSavepointPath());
+	}
+
+	// ------------------------------------------------------------------------
 	//  Contained Non-partitioned State Backend
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d19525e9/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
index 5a937c4..7612c4c 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
@@ -18,22 +18,159 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
+
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the RocksDBStateBackendFactory.
  */
 public class RocksDBStateBackendFactoryTest {
 
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	private final ClassLoader cl = getClass().getClassLoader();
+
+	private final String backendKey = CheckpointingOptions.STATE_BACKEND.key();
+
+	// ------------------------------------------------------------------------
+
 	@Test
 	public void testFactoryName() {
 		// construct the name such that it will not be automatically adjusted on refactorings
 		String factoryName = "org.apache.flink.contrib.streaming.state.Roc";
 		factoryName += "ksDBStateBackendFactory";
 
-		// !!! if this fails, the code in StreamTask.createStateBackend() must be adjusted
+		// !!! if this fails, the code in StateBackendLoader must be adjusted
 		assertEquals(factoryName, RocksDBStateBackendFactory.class.getName());
 	}
+
+	/**
+	 * Validates loading a file system state backend with additional parameters from the cluster
configuration.
+	 */
+	@Test
+	public void testLoadFileSystemStateBackend() throws Exception {
+		final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
+		final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
+		final String localDir1 = tmp.newFolder().getAbsolutePath();
+		final String localDir2 = tmp.newFolder().getAbsolutePath();
+		final String localDirs = localDir1 + File.pathSeparator + localDir2;
+		final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
+
+		final Path expectedCheckpointsPath = new Path(checkpointDir);
+		final Path expectedSavepointsPath = new Path(savepointDir);
+
+		// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
+		// to guard against config-breaking changes of the name
+		final Configuration config1 = new Configuration();
+		config1.setString(backendKey, "rocksdb");
+		config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+		config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+		config1.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
+		config1.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
+
+		final Configuration config2 = new Configuration();
+		config2.setString(backendKey, RocksDBStateBackendFactory.class.getName());
+		config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+		config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+		config2.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
+		config2.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
+
+		StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
+		StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
+
+		assertTrue(backend1 instanceof RocksDBStateBackend);
+		assertTrue(backend2 instanceof RocksDBStateBackend);
+
+		RocksDBStateBackend fs1 = (RocksDBStateBackend) backend1;
+		RocksDBStateBackend fs2 = (RocksDBStateBackend) backend2;
+
+		AbstractFileStateBackend fs1back = (AbstractFileStateBackend) fs1.getCheckpointBackend();
+		AbstractFileStateBackend fs2back = (AbstractFileStateBackend) fs2.getCheckpointBackend();
+
+		assertEquals(expectedCheckpointsPath, fs1back.getCheckpointPath());
+		assertEquals(expectedCheckpointsPath, fs2back.getCheckpointPath());
+		assertEquals(expectedSavepointsPath, fs1back.getSavepointPath());
+		assertEquals(expectedSavepointsPath, fs2back.getSavepointPath());
+		assertEquals(incremental, fs1.isIncrementalCheckpointsEnabled());
+		assertEquals(incremental, fs2.isIncrementalCheckpointsEnabled());
+		checkPaths(fs1.getDbStoragePaths(), localDir1, localDir2);
+		checkPaths(fs2.getDbStoragePaths(), localDir1, localDir2);
+	}
+
+	/**
+	 * Validates taking the application-defined file system state backend and adding with additional
+	 * parameters from the cluster configuration, but giving precedence to application-defined
+	 * parameters over configuration-defined parameters.
+	 */
+	@Test
+	public void testLoadFileSystemStateBackendMixed() throws Exception {
+		final String appCheckpointDir = new Path(tmp.newFolder().toURI()).toString();
+		final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
+		final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
+
+		final String localDir1 = tmp.newFolder().getAbsolutePath();
+		final String localDir2 = tmp.newFolder().getAbsolutePath();
+		final String localDir3 = tmp.newFolder().getAbsolutePath();
+		final String localDir4 = tmp.newFolder().getAbsolutePath();
+
+		final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
+
+		final Path expectedCheckpointsPath = new Path(appCheckpointDir);
+		final Path expectedSavepointsPath = new Path(savepointDir);
+
+		final RocksDBStateBackend backend = new RocksDBStateBackend(appCheckpointDir, incremental);
+		backend.setDbStoragePaths(localDir1, localDir2);
+
+		final Configuration config = new Configuration();
+		config.setString(backendKey, "jobmanager"); // this should not be picked up
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should
not be picked up
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+		config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, !incremental);  // this
should not be picked up
+		config.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDir3 + ":" + localDir4);
 // this should not be picked up
+
+		final StateBackend loadedBackend =
+				StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
+		assertTrue(loadedBackend instanceof RocksDBStateBackend);
+
+		final RocksDBStateBackend loadedRocks = (RocksDBStateBackend) loadedBackend;
+
+		assertEquals(incremental, loadedRocks.isIncrementalCheckpointsEnabled());
+		checkPaths(loadedRocks.getDbStoragePaths(), localDir1, localDir2);
+
+		AbstractFileStateBackend fsBackend = (AbstractFileStateBackend) loadedRocks.getCheckpointBackend();
+		assertEquals(expectedCheckpointsPath, fsBackend.getCheckpointPath());
+		assertEquals(expectedSavepointsPath, fsBackend.getSavepointPath());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static void checkPaths(String[] pathsArray, String... paths) {
+		assertNotNull(pathsArray);
+		assertNotNull(paths);
+
+		assertEquals(pathsArray.length, paths.length);
+
+		HashSet<String> pathsSet = new HashSet<>(Arrays.asList(pathsArray));
+
+		for (String path : paths) {
+			assertTrue(pathsSet.contains(path));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d19525e9/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index e1be744..39724c6 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -110,10 +110,10 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	public boolean enableIncrementalCheckpointing;
 
 	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
 
 	// Store it because we need it for the cleanup test.
-	String dbPath;
+	private String dbPath;
 
 	@Override
 	protected RocksDBStateBackend getStateBackend() throws IOException {


Mime
View raw message