flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [11/17] flink git commit: [FLINK-7925] [checkpoints] Add CheckpointingOptions
Date Thu, 18 Jan 2018 17:09:28 GMT
[FLINK-7925] [checkpoints] Add CheckpointingOptions

The CheckpointingOptions consolidate all checkpointing and state backend-related
settings that were previously split across different classes.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e52db8bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e52db8bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e52db8bc

Branch: refs/heads/master
Commit: e52db8bc411e93c245cc78a278854f2653e5f384
Parents: 03c797a
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Oct 25 17:30:14 2017 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jan 18 18:08:03 2018 +0100

----------------------------------------------------------------------
 .../flink/client/cli/CliFrontendParser.java     |  4 +-
 .../connectors/fs/RollingSinkSecuredITCase.java |  4 +-
 .../configuration/CheckpointingOptions.java     | 92 ++++++++++++++++++++
 .../apache/flink/configuration/CoreOptions.java | 24 -----
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  4 +-
 .../checkpoint/CheckpointCoordinator.java       |  4 +-
 .../executiongraph/ExecutionGraphBuilder.java   | 50 ++++++-----
 .../job/savepoints/SavepointHandlers.java       |  6 +-
 .../JobCancellationWithSavepointHandlers.java   |  4 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  4 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  9 +-
 .../ExecutionGraphDeploymentTest.java           | 10 +--
 .../runtime/jobmanager/JobManagerTest.java      |  6 +-
 ...obCancellationWithSavepointHandlersTest.java |  4 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |  7 +-
 .../api/environment/CheckpointConfig.java       |  3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  6 +-
 .../ExternalizedCheckpointITCase.java           |  9 +-
 .../test/checkpointing/RescalingITCase.java     |  9 +-
 .../test/checkpointing/SavepointITCase.java     | 32 +++----
 .../utils/SavepointMigrationTestBase.java       | 13 +--
 .../test/classloading/ClassLoaderITCase.java    |  9 +-
 .../JobManagerHACheckpointRecoveryITCase.java   |  6 +-
 .../state/operator/restore/keyed/KeyedJob.java  |  4 +-
 .../operator/restore/unkeyed/NonKeyedJob.java   |  4 +-
 .../StatefulJobSavepointMigrationITCase.scala   | 24 ++---
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  7 +-
 27 files changed, 209 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 402b4ae..e5d550f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.client.cli;
 
-import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.CheckpointingOptions;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
@@ -93,7 +93,7 @@ public class CliFrontendParser {
 	static final Option CANCEL_WITH_SAVEPOINT_OPTION = new Option(
 			"s", "withSavepoint", true, "Trigger savepoint and cancel job. The target " +
 			"directory is optional. If no directory is specified, the configured default " +
-			"directory (" + CoreOptions.SAVEPOINT_DIRECTORY.key() + ") is used.");
+			"directory (" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + ") is used.");
 
 	static {
 		HELP_OPTION.setRequired(false);

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index b76d087..b9564ee 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.streaming.connectors.fs;
 
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -216,7 +216,7 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 			result.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
 			result.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
 			result.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-			result.setString(CoreOptions.STATE_BACKEND, "filesystem");
+			result.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
 			result.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
 			result.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
 			result.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
new file mode 100644
index 0000000..1825ba3
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+	// ------------------------------------------------------------------------
+	//  general checkpoint and state backend options
+	// ------------------------------------------------------------------------
+
+	/** The state backend to be used to store and checkpoint state. */
+	public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
+			.key("state.backend")
+			.noDefaultValue();
+
+	/** The maximum number of completed checkpoints to retain.*/
+	public static final ConfigOption<Integer> MAX_RETAINED_CHECKPOINTS = ConfigOptions
+			.key("state.checkpoints.num-retained")
+			.defaultValue(1);
+
+	/** Option whether the state backend should use an asynchronous snapshot method where
+	 * possible and configurable.
+	 *
+	 * <p>Some state backends may not support asynchronous snapshots, or only support
+	 * asynchronous snapshots, and ignore this option. */
+	public static final ConfigOption<Boolean> ASYNC_SNAPSHOTS = ConfigOptions
+			.key("state.backend.async")
+			.defaultValue(true);
+
+	/** Option whether the state backend should create incremental checkpoints,
+	 * if possible. For an incremental checkpoint, only a diff from the previous
+	 * checkpoint is stored, rather than the complete checkpoint state.
+	 *
+	 * <p>Some state backends may not support incremental checkpoints and ignore
+	 * this option.*/
+	public static final ConfigOption<Boolean> INCREMENTAL_CHECKPOINTS = ConfigOptions
+			.key("state.backend.incremental")
+			.defaultValue(false);
+
+	// ------------------------------------------------------------------------
+	//  Options specific to the file-system-based state backends
+	// ------------------------------------------------------------------------
+
+	/** The default directory for savepoints. Used by the state backends that write
+	 * savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */
+	public static final ConfigOption<String> SAVEPOINT_DIRECTORY = ConfigOptions
+			.key("state.savepoints.dir")
+			.noDefaultValue()
+			.withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+	/** The default directory used for checkpoints. Used by the state backends that write
+	 * checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */
+	public static final ConfigOption<String> CHECKPOINTS_DIRECTORY = ConfigOptions
+			.key("state.checkpoints.dir")
+			.noDefaultValue();
+
+	/** The minimum size of state data files. All state chunks smaller than that
+	 * are stored inline in the root checkpoint metadata file. */
+	public static final ConfigOption<Integer> FS_SMALL_FILE_THRESHOLD = ConfigOptions
+			.key("state.backend.fs.memory-threshold")
+			.defaultValue(1024);
+
+	// ------------------------------------------------------------------------
+	//  Options specific to the RocksDB state backend
+	// ------------------------------------------------------------------------
+
+	/** The local directory (on the TaskManager) where RocksDB puts its files. */
+	public static final ConfigOption<String> ROCKSDB_LOCAL_DIRECTORIES = ConfigOptions
+			.key("state.backend.rocksdb.localdir")
+			.noDefaultValue()
+			.withDeprecatedKeys("state.backend.rocksdb.checkpointdir");
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 4592608..f31ad8c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -120,30 +120,6 @@ public class CoreOptions {
 		.defaultValue(-1);
 
 	// ------------------------------------------------------------------------
-	//  checkpoints / fault tolerance
-	// ------------------------------------------------------------------------
-
-	public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
-		.key("state.backend")
-		.noDefaultValue();
-
-	/** The maximum number of completed checkpoint instances to retain.*/
-	public static final ConfigOption<Integer> MAX_RETAINED_CHECKPOINTS = ConfigOptions
-		.key("state.checkpoints.num-retained")
-		.defaultValue(1);
-
-	/** The default directory for savepoints. */
-	public static final ConfigOption<String> SAVEPOINT_DIRECTORY = ConfigOptions
-		.key("state.savepoints.dir")
-		.noDefaultValue()
-		.withDeprecatedKeys("savepoints.state.backend.fs.dir");
-
-	/** The default directory used for persistent checkpoints. */
-	public static final ConfigOption<String> CHECKPOINTS_DIRECTORY = ConfigOptions
-		.key("state.checkpoints.dir")
-		.noDefaultValue();
-
-	// ------------------------------------------------------------------------
 	//  file systems
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 445c61c..7d232a5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
@@ -249,7 +249,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		}
 		metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, scheduledExecutor, timeout);
 
-		String defaultSavepointDir = config.getString(CoreOptions.SAVEPOINT_DIRECTORY);
+		String defaultSavepointDir = config.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY);
 
 		JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(executionGraphCache, scheduledExecutor, defaultSavepointDir);
 		RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler());

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 824563f..a04e34e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
@@ -208,7 +208,7 @@ public class CheckpointCoordinator {
 		if (externalizeSettings.externalizeCheckpoints() && checkpointDirectory == null) {
 			throw new IllegalStateException("CheckpointConfig says to persist periodic " +
 					"checkpoints, but no checkpoint directory has been configured. You can " +
-					"configure configure one via key '" + CoreOptions.CHECKPOINTS_DIRECTORY.key() + "'.");
+					"configure configure one via key '" + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() + "'.");
 		}
 
 		// max "in between duration" can be one year - this is to prevent numeric overflows

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 34ba3df..47948a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.metrics.MetricGroup;
@@ -50,8 +50,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
 import org.apache.flink.util.DynamicCodeLoadingException;
 import org.apache.flink.util.SerializedValue;
 
@@ -199,17 +199,17 @@ public class ExecutionGraphBuilder {
 			CheckpointIDCounter checkpointIdCounter;
 			try {
 				int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
-					CoreOptions.MAX_RETAINED_CHECKPOINTS);
+						CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
 
 				if (maxNumberOfCheckpointsToRetain <= 0) {
 					// warning and use 1 as the default value if the setting in
 					// state.checkpoints.max-retained-checkpoints is not greater than 0.
 					log.warn("The setting for '{} : {}' is invalid. Using default value of {}",
-							CoreOptions.MAX_RETAINED_CHECKPOINTS.key(),
+							CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),
 							maxNumberOfCheckpointsToRetain,
-							CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
+							CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
 
-					maxNumberOfCheckpointsToRetain = CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
+					maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
 				}
 
 				completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);
@@ -229,29 +229,31 @@ public class ExecutionGraphBuilder {
 					metrics);
 
 			// The default directory for externalized checkpoints
-			String externalizedCheckpointsDir = jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY);
+			String externalizedCheckpointsDir = jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
 
-			// load the state backend for checkpoint metadata.
-			// if specified in the application, use from there, otherwise load from configuration
-			final StateBackend metadataBackend;
+			// load the state backend from the application settings
+			final StateBackend applicationConfiguredBackend;
+			final SerializedValue<StateBackend> serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
 
-			final SerializedValue<StateBackend> applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
-			if (applicationConfiguredBackend != null) {
+			if (serializedAppConfigured == null) {
+				applicationConfiguredBackend = null;
+			}
+			else {
 				try {
-					metadataBackend = applicationConfiguredBackend.deserializeValue(classLoader);
+					applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader);
 				} catch (IOException | ClassNotFoundException e) {
-					throw new JobExecutionException(jobId, "Could not instantiate configured state backend.", e);
+					throw new JobExecutionException(jobId, 
+							"Could not deserialize application-defined state backend.", e);
 				}
+			}
 
-				log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.",
-					metadataBackend);
-			} else {
-				try {
-					metadataBackend = AbstractStateBackend
-							.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
-				} catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
-					throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
-				}
+			final StateBackend rootBackend;
+			try {
+				rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(
+						applicationConfiguredBackend, jobManagerConfig, classLoader, log);
+			}
+			catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
+				throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
 			}
 
 			// instantiate the user-defined checkpoint hooks
@@ -301,7 +303,7 @@ public class ExecutionGraphBuilder {
 				checkpointIdCounter,
 				completedCheckpoints,
 				externalizedCheckpointsDir,
-				metadataBackend,
+				rootBackend,
 				checkpointStatsTracker);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
index 6b437d0..ade2b54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.rest.handler.job.savepoints;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -79,7 +79,7 @@ import static java.util.Objects.requireNonNull;
  * { "target-directory": "/tmp" }
  * </pre>
  * If the body is omitted, or the field {@code target-property} is {@code null}, the default
- * savepoint directory as specified by {@link CoreOptions#SAVEPOINT_DIRECTORY} will be used.
+ * savepoint directory as specified by {@link CheckpointingOptions#SAVEPOINT_DIRECTORY} will be used.
  * As written above, the response will contain a request id, e.g.,
  * <pre>
  * { "request-id": "7d273f5a62eb4730b9dea8e833733c1e" }
@@ -146,7 +146,7 @@ public class SavepointHandlers {
 				return FutureUtils.completedExceptionally(
 					new RestHandlerException(
 						String.format("Config key [%s] is not set. Property [%s] must be provided.",
-							CoreOptions.SAVEPOINT_DIRECTORY.key(),
+							CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
 							SavepointTriggerRequestBody.FIELD_NAME_TARGET_DIRECTORY),
 						HttpResponseStatus.BAD_REQUEST));
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
index 12f27dd..93564e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.rest.handler.legacy;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
@@ -166,7 +166,7 @@ public class JobCancellationWithSavepointHandlers {
 									throw new IllegalStateException("No savepoint directory configured. " +
 										"You can either specify a directory when triggering this savepoint or " +
 										"configure a cluster-wide default via key '" +
-										CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
+											CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
 								} else {
 									targetDirectory = defaultSavepointDirectory;
 								}

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 7ef17d8..f0cfb5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
@@ -356,7 +356,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			timeout,
 			responseHeaders);
 
-		final String defaultSavepointDir = clusterConfiguration.getString(CoreOptions.SAVEPOINT_DIRECTORY);
+		final String defaultSavepointDir = clusterConfiguration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY);
 
 		final SavepointHandlers savepointHandlers = new SavepointHandlers(defaultSavepointDir);
 		final SavepointHandlers.SavepointTriggerHandler savepointTriggerHandler = savepointHandlers.new SavepointTriggerHandler(

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5f82159..0f8033d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -156,7 +156,8 @@ class JobManager(
   var futuresToComplete: Option[Seq[Future[Unit]]] = None
 
   /** The default directory for savepoints. */
-  val defaultSavepointDir: String = flinkConfiguration.getString(CoreOptions.SAVEPOINT_DIRECTORY)
+  val defaultSavepointDir: String = 
+    flinkConfiguration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY)
 
   /** The resource manager actor responsible for allocating and managing task manager resources. */
   var currentResourceManager: Option[ActorRef] = None
@@ -564,7 +565,7 @@ class JobManager(
           sender ! decorateMessage(CancellationFailure(jobId, new IllegalStateException(
             "No savepoint directory configured. You can either specify a directory " +
               "while cancelling via -s :targetDirectory or configure a cluster-wide " +
-              "default via key '" + CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.")))
+              "default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.")))
         } else {
           log.info(s"Trying to cancel job $jobId with savepoint to $targetDirectory")
 
@@ -750,13 +751,13 @@ class JobManager(
             val senderRef = sender()
             try {
               val targetDirectory : String = savepointDirectory.getOrElse(
-                flinkConfiguration.getString(CoreOptions.SAVEPOINT_DIRECTORY))
+                flinkConfiguration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY))
 
               if (targetDirectory == null) {
                 throw new IllegalStateException("No savepoint directory configured. " +
                   "You can either specify a directory when triggering this savepoint or " +
                   "configure a cluster-wide default via key '" +
-                  CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.")
+                  CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.")
               }
 
               // Do this async, because checkpoint coordinator operations can

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index e869625..12e9b5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -479,7 +479,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 		final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
 
-		assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
+		assertEquals(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
 				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
 	}
 
@@ -488,7 +488,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 		final int maxNumberOfCheckpointsToRetain = 10;
 		final Configuration jobManagerConfig = new Configuration();
-		jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+		jobManagerConfig.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS,
 			maxNumberOfCheckpointsToRetain);
 
 		final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
@@ -554,7 +554,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 		final int negativeMaxNumberOfCheckpointsToRetain = -10;
 
 		final Configuration jobManagerConfig = new Configuration();
-		jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+		jobManagerConfig.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS,
 			negativeMaxNumberOfCheckpointsToRetain);
 
 		final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
@@ -562,7 +562,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 		assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
 			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
 
-		assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
+		assertEquals(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
 			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index ecf2ae3..01d2346 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.queryablestate.KvStateID;
@@ -830,7 +830,7 @@ public class JobManagerTest extends TestLogger {
 
 		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
 		Configuration config = new Configuration();
-		config.setString(CoreOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.getAbsolutePath());
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.getAbsolutePath());
 
 		ActorSystem actorSystem = null;
 		ActorGateway jobManager = null;
@@ -1157,7 +1157,7 @@ public class JobManagerTest extends TestLogger {
 
 		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
 		Configuration config = new Configuration();
-		config.setString(CoreOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.getAbsolutePath());
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.getAbsolutePath());
 
 		ActorSystem actorSystem = null;
 		ActorGateway jobManager = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
index 9c63673..9e801a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -165,7 +165,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 			fail("Did not throw expected test Exception");
 		} catch (Exception e) {
 			IllegalStateException cause = (IllegalStateException) e.getCause();
-			assertEquals(true, cause.getMessage().contains(CoreOptions.SAVEPOINT_DIRECTORY.key()));
+			assertEquals(true, cause.getMessage().contains(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 9af8aaf..e688e2f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -19,12 +19,11 @@
 package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -82,8 +81,8 @@ public class ZooKeeperTestUtils {
 		config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
 
 		// File system state backend
-		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
-		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
+		config.setString(CheckpointingOptions.STATE_BACKEND, "FILESYSTEM");
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, fsStateHandlePath + "/checkpoints");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery");
 
 		// Akka failure detection and execution retries

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 342d4a7..87c800d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
@@ -267,7 +266,7 @@ public class CheckpointConfig implements java.io.Serializable {
 	 * (terminating with job status {@link JobStatus#CANCELED}).
 	 *
 	 * <p>The target directory for externalized checkpoints is configured
-	 * via {@link CoreOptions#CHECKPOINTS_DIRECTORY}.
+	 * via {@link org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY}.
 	 *
 	 * @param cleanupMode Externalized checkpoint cleanup behaviour.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 46862f2..f492d9e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobCacheService;
@@ -211,7 +211,7 @@ public class StreamTaskTest extends TestLogger {
 	@Test
 	public void testStateBackendLoadingAndClosing() throws Exception {
 		Configuration taskManagerConfig = new Configuration();
-		taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName());
+		taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, MockStateBackend.class.getName());
 
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		cfg.setOperatorID(new OperatorID(4711L, 42L));
@@ -236,7 +236,7 @@ public class StreamTaskTest extends TestLogger {
 	@Test
 	public void testStateBackendClosingOnFailure() throws Exception {
 		Configuration taskManagerConfig = new Configuration();
-		taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName());
+		taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, MockStateBackend.class.getName());
 
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		cfg.setOperatorID(new OperatorID(4711L, 42L));

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
index 6ba5ca4..f733e6d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
@@ -22,9 +22,9 @@ import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
@@ -150,9 +149,9 @@ public class ExternalizedCheckpointITCase extends TestLogger {
 
 		final File savepointDir = temporaryFolder.newFolder();
 
-		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
-		config.setString(CoreOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
-		config.setString(CoreOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
 
 		// ZooKeeper recovery mode?
 		if (zooKeeperQuorum != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 40b45f9..f874c8c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -28,9 +28,9 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -127,9 +126,9 @@ public class RescalingITCase extends TestLogger {
 			final File checkpointDir = temporaryFolder.newFolder();
 			final File savepointDir = temporaryFolder.newFolder();
 
-			config.setString(CoreOptions.STATE_BACKEND, currentBackend);
-			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
-			config.setString(CoreOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+			config.setString(CheckpointingOptions.STATE_BACKEND, currentBackend);
+			config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+			config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 			cluster = new TestingCluster(config);
 			cluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 75f0aa4..2711870 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -28,9 +28,9 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -55,8 +55,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestSavepoint;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint;
@@ -122,7 +120,7 @@ public class SavepointITCase extends TestLogger {
 	private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class);
 
 	@Rule
-	public TemporaryFolder folder = new TemporaryFolder();
+	public final TemporaryFolder folder = new TemporaryFolder();
 
 	/**
 	 * Triggers a savepoint for a job that uses the FsStateBackend. We expect
@@ -166,10 +164,10 @@ public class SavepointITCase extends TestLogger {
 			}
 
 			// Use file based checkpoints
-			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
-			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
-			config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
-			config.setString(CoreOptions.SAVEPOINT_DIRECTORY, savepointRootDir.toURI().toString());
+			config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
+			config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+			config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
+			config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointRootDir.toURI().toString());
 
 			// Start Flink
 			flink = new TestingCluster(config);
@@ -434,8 +432,7 @@ public class SavepointITCase extends TestLogger {
 			final Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-			config.setString(CoreOptions.SAVEPOINT_DIRECTORY,
-				savepointDir.toURI().toString());
+			config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 			LOG.info("Flink configuration: " + config + ".");
 
@@ -501,8 +498,7 @@ public class SavepointITCase extends TestLogger {
 		final Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-		config.setString(CoreOptions.SAVEPOINT_DIRECTORY,
-				savepointDir.toURI().toString());
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 		String savepointPath;
 
@@ -686,7 +682,7 @@ public class SavepointITCase extends TestLogger {
 			if (data == null) {
 				// We need this to be large, because we want to test with files
 				Random rand = new Random(getRuntimeContext().getIndexOfThisSubtask());
-				data = new byte[FsStateBackend.DEFAULT_FILE_STATE_THRESHOLD + 1];
+				data = new byte[CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue() + 1];
 				rand.nextBytes(data);
 			}
 		}
@@ -808,12 +804,10 @@ public class SavepointITCase extends TestLogger {
 			fail("Test setup failed: failed to create temporary directories.");
 		}
 
-		config.setString(CoreOptions.STATE_BACKEND, "filesystem");
-		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
-				checkpointDir.toURI().toString());
-		config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
-		config.setString(CoreOptions.SAVEPOINT_DIRECTORY,
-				savepointDir.toURI().toString());
+		config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+		config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 		TestingCluster cluster = new TestingCluster(config, false);
 		String savepointPath = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index eccc7e9..b694e0c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -22,9 +22,9 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
 import org.apache.flink.runtime.client.JobListeningContext;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -33,16 +33,17 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.TestBaseUtils;
 
 import org.apache.commons.io.FileUtils;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -107,10 +108,10 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
 		LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
 		LOG.info("Created savepoint directory: " + savepointDir + ".");
 
-		config.setString(CoreOptions.STATE_BACKEND, "memory");
-		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
-		config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
-		config.setString("state.savepoints.dir", savepointDir.toURI().toString());
+		config.setString(CheckpointingOptions.STATE_BACKEND, "memory");
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+		config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 		cluster = TestBaseUtils.startCluster(config, false);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 58caf5c..b357904 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -21,9 +21,9 @@ package org.apache.flink.test.classloading;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobStatusMessage;
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFail
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
@@ -114,12 +113,12 @@ public class ClassLoaderITCase extends TestLogger {
 		parallelism = 4;
 
 		// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
-		config.setString(CoreOptions.STATE_BACKEND, "filesystem");
-		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
+		config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
 				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
 		// Savepoint path
-		config.setString(CoreOptions.SAVEPOINT_DIRECTORY,
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
 				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
 		testCluster = new TestingCluster(config, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index cefadb4..16ea6d5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -362,11 +362,11 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 		try {
 			Configuration config = new Configuration();
 
-			config.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints);
+			config.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
-			config.setString(CoreOptions.CHECKPOINTS_DIRECTORY, temporaryFolder.newFolder().toString());
+			config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, temporaryFolder.newFolder().toString());
 
 			String tmpFolderString = temporaryFolder.newFolder().toString();
 			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, tmpFolderString);

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
index 076feda..c53f80f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -59,7 +59,7 @@ public class KeyedJob {
 		String savepointsPath = pt.getRequired("savepoint-path");
 
 		Configuration config = new Configuration();
-		config.setString(CoreOptions.SAVEPOINT_DIRECTORY, savepointsPath);
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointsPath);
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
 		env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
index 1b0ed45..0b5a21f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -52,7 +52,7 @@ public class NonKeyedJob {
 		String savepointsPath = pt.getRequired("savepoint-path");
 
 		Configuration config = new Configuration();
-		config.setString(CoreOptions.SAVEPOINT_DIRECTORY, savepointsPath);
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointsPath);
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
 		env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
index 1e67042..435262c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -35,14 +35,14 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
 import org.apache.flink.util.Collector
 import org.apache.flink.api.java.tuple.Tuple2
-import org.apache.flink.runtime.state.{AbstractStateBackend, FunctionInitializationContext, FunctionSnapshotContext}
+import org.apache.flink.runtime.state.{StateBackendLoader, FunctionInitializationContext, FunctionSnapshotContext}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
 import org.apache.flink.streaming.util.migration.MigrationVersion
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{Assume, Ignore, Test}
+import org.junit.{Ignore, Test}
 
 import scala.util.{Failure, Properties, Try}
 
@@ -51,17 +51,17 @@ object StatefulJobSavepointMigrationITCase {
   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
   def parameters: util.Collection[(MigrationVersion, String)] = {
     util.Arrays.asList(
-      (MigrationVersion.v1_2, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME),
-      (MigrationVersion.v1_2, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME),
-      (MigrationVersion.v1_3, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME),
-      (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME))
+      (MigrationVersion.v1_2, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      (MigrationVersion.v1_2, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+      (MigrationVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      (MigrationVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME))
   }
 
   // TODO to generate savepoints for a specific Flink version / backend type,
   // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB,
-  // TODO set as (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME)
+  // TODO set as (MigrationVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)
   val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_3
-  val GENERATE_SAVEPOINT_BACKEND_TYPE: String = AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME
+  val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME
 
   val SCALA_VERSION: String = {
     val versionString = Properties.versionString.split(" ")(1)
@@ -86,9 +86,9 @@ class StatefulJobSavepointMigrationITCase(
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
     StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match {
-      case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME =>
+      case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
         env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()))
-      case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME =>
+      case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
         env.setStateBackend(new MemoryStateBackend())
       case _ => throw new UnsupportedOperationException
     }
@@ -129,9 +129,9 @@ class StatefulJobSavepointMigrationITCase(
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
     migrationVersionAndBackend._2 match {
-      case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME =>
+      case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
         env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()))
-      case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME =>
+      case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
         env.setStateBackend(new MemoryStateBackend())
       case _ => throw new UnsupportedOperationException
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/e52db8bc/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 55669f1..5a5fb74 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -20,9 +20,9 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
@@ -122,8 +121,8 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 
 		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
 			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
-			"@@" + CoreOptions.STATE_BACKEND + "=FILESYSTEM" +
-			"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
+			"@@" + CheckpointingOptions.STATE_BACKEND.key() + "=FILESYSTEM" +
+			"@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" + fsStateHandlePath + "/checkpoints" +
 			"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
 
 		ClusterClient<ApplicationId> yarnCluster = null;


Mime
View raw message