flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [2/7] flink git commit: [FLINK-6499] [config] Migrate state configuration options
Date Thu, 06 Jul 2017 04:23:48 GMT
[FLINK-6499] [config] Migrate state configuration options

This closes #4173.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10f50698
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10f50698
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10f50698

Branch: refs/heads/master
Commit: 10f50698ee571961f307c144e982e464f836a4e4
Parents: 78303d4
Author: zjureel <zjureel@gmail.com>
Authored: Fri Jun 23 17:59:28 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Thu Jul 6 12:11:57 2017 +0800

----------------------------------------------------------------------
 .../apache/flink/client/cli/CliFrontendParser.java    |  4 ++--
 .../apache/flink/configuration/ConfigConstants.java   | 14 +++++++++++---
 .../org/apache/flink/configuration/CoreOptions.java   | 11 +++++++++++
 .../flink/runtime/webmonitor/WebRuntimeMonitor.java   |  4 ++--
 .../JobCancellationWithSavepointHandlers.java         |  3 ++-
 .../JobCancellationWithSavepointHandlersTest.java     |  4 ++--
 .../runtime/checkpoint/CheckpointCoordinator.java     |  3 ++-
 .../runtime/executiongraph/ExecutionGraphBuilder.java |  3 +--
 .../apache/flink/runtime/jobmanager/JobManager.scala  | 12 ++++--------
 .../flink/runtime/jobmanager/JobManagerTest.java      |  5 +++--
 .../streaming/api/environment/CheckpointConfig.java   |  4 ++--
 .../flink/test/checkpointing/RescalingITCase.java     |  2 +-
 .../flink/test/checkpointing/SavepointITCase.java     |  8 ++++----
 .../flink/test/classloading/ClassLoaderITCase.java    |  2 +-
 .../JobManagerHACheckpointRecoveryITCase.java         |  2 +-
 .../test/state/operator/restore/keyed/KeyedJob.java   |  4 ++--
 .../state/operator/restore/unkeyed/NonKeyedJob.java   |  4 ++--
 17 files changed, 53 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 9e54ab7..1aec391 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -19,7 +19,7 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.client.CliFrontend;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
@@ -94,7 +94,7 @@ public class CliFrontendParser {
 	static final Option CANCEL_WITH_SAVEPOINT_OPTION = new Option(
 			"s", "withSavepoint", true, "Trigger savepoint and cancel job. The target " +
 			"directory is optional. If no directory is specified, the configured default " +
-			"directory (" + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + ") is used.");
+			"directory (" + CoreOptions.SAVEPOINT_DIRECTORY.key() + ") is used.");
 
 	static {
 		HELP_OPTION.setRequired(false);

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a7a883f..f817344 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1164,18 +1164,26 @@ public final class ConfigConstants {
 
 	// ---------------------------- Checkpoints -------------------------------
 
-	/** The default directory for savepoints. */
+	/**
+	 * The default directory for savepoints.
+	 * @deprecated Use {@link CoreOptions#SAVEPOINT_DIRECTORY} instead.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String SAVEPOINT_DIRECTORY_KEY = "state.savepoints.dir";
 
-	/** The default directory used for persistent checkpoints. */
+	/**
+	 * The default directory used for persistent checkpoints.
+	 * @deprecated Use {@link CoreOptions#CHECKPOINTS_DIRECTORY} instead.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String CHECKPOINTS_DIRECTORY_KEY = "state.checkpoints.dir";
 
 	/**
 	 * @deprecated This key was used in Flink versions <= 1.1.X with the savepoint backend
 	 * configuration. We now always use the FileSystem for savepoints. For this,
-	 * the only relevant config key is {@link #SAVEPOINT_DIRECTORY_KEY}.
+	 * the only relevant config key is {@link CoreOptions#SAVEPOINT_DIRECTORY}.
 	 */
 	@Deprecated
 	public static final String SAVEPOINT_FS_DIRECTORY_KEY = "savepoints.state.backend.fs.dir";

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 8cb4123..80d610a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -59,4 +59,15 @@ public class CoreOptions {
 	public static final ConfigOption<Integer> MAX_RETAINED_CHECKPOINTS = ConfigOptions
 		.key("state.checkpoints.num-retained")
 		.defaultValue(1);
+
+	/** The default directory for savepoints. */
+	public static final ConfigOption<String> SAVEPOINT_DIRECTORY = ConfigOptions
+		.key("state.savepoints.dir")
+		.noDefaultValue()
+		.withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+	/** The default directory used for persistent checkpoints. */
+	public static final ConfigOption<String> CHECKPOINTS_DIRECTORY = ConfigOptions
+		.key("state.checkpoints.dir")
+		.noDefaultValue();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index dbaaaf2..4c4f6e0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobView;
@@ -227,7 +227,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		}
 		metricFetcher = new MetricFetcher(actorSystem, retriever, context);
 
-		String defaultSavepointDir = config.getString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
null);
+		String defaultSavepointDir = config.getString(CoreOptions.SAVEPOINT_DIRECTORY);
 
 		JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs,
context, defaultSavepointDir);
 		RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler());

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index 3f7b824..f396a7f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -163,7 +164,7 @@ public class JobCancellationWithSavepointHandlers {
 								throw new IllegalStateException("No savepoint directory configured. " +
 										"You can either specify a directory when triggering this savepoint or " +
 										"configure a cluster-wide default via key '" +
-										ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'.");
+										CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
 							} else {
 								targetDirectory = defaultSavepointDirectory;
 							}

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
index b0f4e8a..0ccd34c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -161,7 +161,7 @@ public class JobCancellationWithSavepointHandlersTest {
 			fail("Did not throw expected test Exception");
 		} catch (Exception e) {
 			IllegalStateException cause = (IllegalStateException) e.getCause();
-			assertEquals(true, cause.getMessage().contains(ConfigConstants.SAVEPOINT_DIRECTORY_KEY));
+			assertEquals(true, cause.getMessage().contains(CoreOptions.SAVEPOINT_DIRECTORY.key()));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5201d43..4236216 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
@@ -202,7 +203,7 @@ public class CheckpointCoordinator {
 		if (externalizeSettings.externalizeCheckpoints() && checkpointDirectory == null)
{
 			throw new IllegalStateException("CheckpointConfig says to persist periodic " +
 					"checkpoints, but no checkpoint directory has been configured. You can " +
-					"configure configure one via key '" + ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
+					"configure configure one via key '" + CoreOptions.CHECKPOINTS_DIRECTORY.key() + "'.");
 		}
 
 		// max "in between duration" can be one year - this is to prevent numeric overflows

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index c8ecd3c..01ac8a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -216,8 +216,7 @@ public class ExecutionGraphBuilder {
 					metrics);
 
 			// The default directory for externalized checkpoints
-			String externalizedCheckpointsDir = jobManagerConfig.getString(
-					ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null);
+			String externalizedCheckpointsDir = jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY);
 
 			// load the state backend for checkpoint metadata.
 			// if specified in the application, use from there, otherwise load from configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9beae07..51d4159 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -174,11 +174,7 @@ class JobManager(
     JobManagerOptions.WEB_PORT.key(), -1)
 
   /** The default directory for savepoints. */
-  val defaultSavepointDir: String = ConfigurationUtil.getStringWithDeprecatedKeys(
-    flinkConfiguration,
-    ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
-    null,
-    ConfigConstants.SAVEPOINT_FS_DIRECTORY_KEY)
+  val defaultSavepointDir: String = flinkConfiguration.getString(CoreOptions.SAVEPOINT_DIRECTORY)
 
   /** The resource manager actor responsible for allocating and managing task manager resources.
*/
   var currentResourceManager: Option[ActorRef] = None
@@ -609,7 +605,7 @@ class JobManager(
           sender ! decorateMessage(CancellationFailure(jobId, new IllegalStateException(
             "No savepoint directory configured. You can either specify a directory " +
               "while cancelling via -s :targetDirectory or configure a cluster-wide " +
-              "default via key '" + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'.")))
+              "default via key '" + CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.")))
         } else {
           log.info(s"Trying to cancel job $jobId with savepoint to $targetDirectory")
 
@@ -784,13 +780,13 @@ class JobManager(
             val senderRef = sender()
             try {
               val targetDirectory : String = savepointDirectory.getOrElse(
-                flinkConfiguration.getString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, null))
+                flinkConfiguration.getString(CoreOptions.SAVEPOINT_DIRECTORY))
 
               if (targetDirectory == null) {
                 throw new IllegalStateException("No savepoint directory configured. " +
                   "You can either specify a directory when triggering this savepoint or "
+
                   "configure a cluster-wide default via key '" +
-                  ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'.")
+                  CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.")
               }
 
               // Do this async, because checkpoint coordinator operations can

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 1a4396e..e863484 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -811,7 +812,7 @@ public class JobManagerTest extends TestLogger {
 
 		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, defaultSavepointDir.getAbsolutePath());
+		config.setString(CoreOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.getAbsolutePath());
 
 		ActorSystem actorSystem = null;
 		ActorGateway jobManager = null;
@@ -1057,7 +1058,7 @@ public class JobManagerTest extends TestLogger {
 
 		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, defaultSavepointDir.getAbsolutePath());
+		config.setString(CoreOptions.SAVEPOINT_DIRECTORY, defaultSavepointDir.getAbsolutePath());
 
 		ActorSystem actorSystem = null;
 		ActorGateway jobManager = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index da65147..e1b566e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
@@ -247,7 +247,7 @@ public class CheckpointConfig implements java.io.Serializable {
 	 * (terminating with job status {@link JobStatus#CANCELED}).
 	 *
 	 * <p>The target directory for externalized checkpoints is configured
-	 * via {@link ConfigConstants#CHECKPOINTS_DIRECTORY_KEY}.
+	 * via {@link CoreOptions#CHECKPOINTS_DIRECTORY}.
 	 *
 	 * @param cleanupMode Externalized checkpoint cleanup behaviour.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index a58ec51..e934c27 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -127,7 +127,7 @@ public class RescalingITCase extends TestLogger {
 
 			config.setString(CoreOptions.STATE_BACKEND, currentBackend);
 			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
-			config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString());
+			config.setString(CoreOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 			cluster = new TestingCluster(config);
 			cluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 1c8a429..09dfa99 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -166,7 +166,7 @@ public class SavepointITCase extends TestLogger {
 			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
 			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
 			config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
-			config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointRootDir.toURI().toString());
+			config.setString(CoreOptions.SAVEPOINT_DIRECTORY, savepointRootDir.toURI().toString());
 
 			// Start Flink
 			flink = new TestingCluster(config);
@@ -433,7 +433,7 @@ public class SavepointITCase extends TestLogger {
 			final Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-			config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
+			config.setString(CoreOptions.SAVEPOINT_DIRECTORY,
 				savepointDir.toURI().toString());
 
 			LOG.info("Flink configuration: " + config + ".");
@@ -503,7 +503,7 @@ public class SavepointITCase extends TestLogger {
 			final Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-			config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
+			config.setString(CoreOptions.SAVEPOINT_DIRECTORY,
 					savepointDir.toURI().toString());
 
 			LOG.info("Flink configuration: " + config + ".");
@@ -812,7 +812,7 @@ public class SavepointITCase extends TestLogger {
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
 				checkpointDir.toURI().toString());
 		config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
-		config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
+		config.setString(CoreOptions.SAVEPOINT_DIRECTORY,
 				savepointDir.toURI().toString());
 
 		TestingCluster cluster = new TestingCluster(config, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index b1915bd..8546368 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -103,7 +103,7 @@ public class ClassLoaderITCase extends TestLogger {
 				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
 		// Savepoint path
-		config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
+		config.setString(CoreOptions.SAVEPOINT_DIRECTORY,
 				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
 		testCluster = new TestingCluster(config, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 33c3454..f70cc1e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -359,7 +359,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
-			config.setString(ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, temporaryFolder.newFolder().toString());
+			config.setString(CoreOptions.CHECKPOINTS_DIRECTORY, temporaryFolder.newFolder().toString());
 
 
 			String tmpFolderString = temporaryFolder.newFolder().toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
index 95d0efc..3b23c01 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -57,7 +57,7 @@ public class KeyedJob {
 		String savepointsPath = pt.getRequired("savepoint-path");
 
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointsPath);
+		config.setString(CoreOptions.SAVEPOINT_DIRECTORY, savepointsPath);
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
 		env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);

http://git-wip-us.apache.org/repos/asf/flink/blob/10f50698/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
index 08a4c67..1c55681 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -50,7 +50,7 @@ public class NonKeyedJob {
 		String savepointsPath = pt.getRequired("savepoint-path");
 
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointsPath);
+		config.setString(CoreOptions.SAVEPOINT_DIRECTORY, savepointsPath);
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
 		env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);


Mime
View raw message