flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [4/8] flink git commit: [FLINK-4768] [core] Migrate high-availability configuration parameters to ConfigOptions
Date Thu, 13 Oct 2016 15:40:14 GMT
[FLINK-4768] [core] Migrate high-availability configuration parameters to ConfigOptions

This closes #2607


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8dc074a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8dc074a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8dc074a

Branch: refs/heads/flip-6
Commit: c8dc074a1899fa0f7d6ce7c6377c5e3d30159c18
Parents: d71a09c
Author: Stephan Ewen <sewen@apache.org>
Authored: Sat Oct 8 01:41:02 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Oct 13 16:25:49 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/cli/DefaultCLI.java |   5 +-
 .../configuration/HighAvailabilityOptions.java  | 139 +++++++++++++++++++
 .../webmonitor/WebRuntimeMonitorITCase.java     |   7 +-
 .../flink/runtime/blob/FileSystemBlobStore.java |  22 ++-
 .../jobmanager/HighAvailabilityMode.java        |   8 +-
 .../flink/runtime/security/SecurityContext.java |  11 +-
 .../flink/runtime/util/ZooKeeperUtils.java      |  68 +++------
 .../zookeeper/FlinkZooKeeperQuorumPeer.java     |  46 +++---
 .../flink/runtime/jobmanager/JobManager.scala   |  14 +-
 .../flink/runtime/blob/BlobRecoveryITCase.java  |   5 +-
 .../BlobLibraryCacheRecoveryITCase.java         |   5 +-
 .../jobmanager/HighAvailabilityModeTest.java    |  13 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   5 +-
 .../ZooKeeperLeaderElectionTest.java            |  25 ++--
 .../ZooKeeperLeaderRetrievalTest.java           |  15 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |  13 +-
 .../flink/runtime/util/ZooKeeperUtilTest.java   |   3 +-
 .../zookeeper/ZooKeeperTestEnvironment.java     |  10 +-
 .../runtime/testingUtils/TestingUtils.scala     |  13 +-
 .../connectors/fs/RollingSinkSecuredITCase.java |   5 +-
 .../flink/test/util/SecureTestEnvironment.java  |   3 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   3 +-
 .../flink/test/recovery/ChaosMonkeyITCase.java  |   3 +-
 ...agerHAProcessFailureBatchRecoveryITCase.java |   5 +-
 ...CliFrontendYarnAddressConfigurationTest.java |  11 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   3 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |   5 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   3 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   6 +-
 29 files changed, 302 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 18fa323..8f79403 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -19,11 +19,12 @@ package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
+
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 
 import java.net.InetSocketAddress;
 
@@ -64,7 +65,7 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
 
 		if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
 			String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
-			config.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+			config.setString(HighAvailabilityOptions.HA_CLUSTER_ID.key(), zkNamespace);
 		}
 
 		StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
new file mode 100644
index 0000000..1ee988a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to high-availability settings.
+ */
+@PublicEvolving
+public class HighAvailabilityOptions {
+
+	// ------------------------------------------------------------------------
+	//  Required High Availability Options
+	// ------------------------------------------------------------------------
+
+	/** 
+	 * Defines high-availability mode used for the cluster execution.
+	 * A value of "NONE" signals no highly available setup.
+	 * To enable high-availability, set this mode to "ZOOKEEPER".
+	 */
+	public static final ConfigOption<String> HA_MODE = 
+			key("high-availability")
+			.defaultValue("NONE")
+			.withDeprecatedKeys("recovery.mode");
+
+	/**
+	 * The ID of the Flink cluster, used to separate multiple Flink clusters 
+	 * Needs to be set for standalone clusters, is automatically inferred in YARN and Mesos.
+	 */
+	public static final ConfigOption<String> HA_CLUSTER_ID = 
+			key("high-availability.cluster-id")
+			.defaultValue("/default")
+			.withDeprecatedKeys("high-availability.zookeeper.path.namespace", "recovery.zookeeper.path.namespace");
+
+	/**
+	 * File system path (URI) where Flink persists metadata in high-availability setups
+	 */
+	public static final ConfigOption<String> HA_STORAGE_PATH =
+			key("high-availability.storageDir")
+			.noDefaultValue()
+			.withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir");
+
+	/**
+	 * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
+	 */
+	public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
+			key("high-availability.zookeeper.quorum")
+			.noDefaultValue()
+			.withDeprecatedKeys("recovery.zookeeper.quorum");
+	
+
+	// ------------------------------------------------------------------------
+	//  Recovery Options
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Optional port (range) used by the job manager in high-availability mode.
+	 */
+	public static final ConfigOption<String> HA_JOB_MANAGER_PORT_RANGE = 
+			key("high-availability.jobmanager.port")
+			.defaultValue("0")
+			.withDeprecatedKeys("recovery.jobmanager.port");
+
+	/**
+	 * The time before a JobManager after a fail over recovers the current jobs.
+	 */
+	public static final ConfigOption<String> HA_JOB_DELAY = 
+			key("high-availability.job.delay")
+			.noDefaultValue()
+			.withDeprecatedKeys("recovery.job.delay");
+
+	// ------------------------------------------------------------------------
+	//  ZooKeeper Options
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The root path under which Flink stores its entries in ZooKeeper
+	 */
+	public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =
+			key("high-availability.zookeeper.path.root")
+			.defaultValue("/flink")
+			.withDeprecatedKeys("recovery.zookeeper.path.root");
+
+	// ------------------------------------------------------------------------
+	//  ZooKeeper Client Settings
+	// ------------------------------------------------------------------------
+
+	public static final ConfigOption<Integer> ZOOKEEPER_SESSION_TIMEOUT = 
+			key("high-availability.zookeeper.client.session-timeout")
+			.defaultValue(60000)
+			.withDeprecatedKeys("recovery.zookeeper.client.session-timeout");
+
+	public static final ConfigOption<Integer> ZOOKEEPER_CONNECTION_TIMEOUT =
+			key("high-availability.zookeeper.client.connection-timeout")
+			.defaultValue(15000)
+			.withDeprecatedKeys("recovery.zookeeper.client.connection-timeout");
+
+	public static final ConfigOption<Integer> ZOOKEEPER_RETRY_WAIT = 
+			key("high-availability.zookeeper.client.retry-wait")
+			.defaultValue(5000)
+			.withDeprecatedKeys("recovery.zookeeper.client.retry-wait");
+
+	public static final ConfigOption<Integer> ZOOKEEPER_MAX_RETRY_ATTEMPTS = 
+			key("high-availability.zookeeper.client.max-retry-attempts")
+			.defaultValue(3)
+			.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts");
+
+	public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE = 
+			key("zookeeper.sasl.disable")
+			.defaultValue(true);
+
+	public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME = 
+			key("zookeeper.sasl.service-name")
+			.noDefaultValue();
+
+	// ------------------------------------------------------------------------
+
+	/** Not intended to be instantiated */
+	private HighAvailabilityOptions() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 54c5e76..1ae776c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
@@ -237,7 +238,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				followingClient.sendGetRequest("index.html", deadline.timeLeft());
 				response = followingClient.getNextResponse(deadline.timeLeft());
 				assertEquals(HttpResponseStatus.TEMPORARY_REDIRECT, response.getStatus());
-				assertTrue(response.getLocation().contains("" + leadingWebMonitor.getServerPort()));
+				assertTrue(response.getLocation().contains(String.valueOf(leadingWebMonitor.getServerPort())));
 
 				// Kill the leader
 				leadingSystem.shutdown();
@@ -296,8 +297,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			final Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
 			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
-			config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
-			config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString());
+			config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeper.getConnectString());
 
 			actorSystem = AkkaUtils.createDefaultActorSystem();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index ee189d4..deba738 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -19,14 +19,17 @@
 package org.apache.flink.runtime.blob;
 
 import com.google.common.io.Files;
+
+import org.apache.commons.lang3.StringUtils;
+
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.ConfigurationUtil;
 import org.apache.flink.util.IOUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,16 +55,11 @@ class FileSystemBlobStore implements BlobStore {
 	private final String basePath;
 
 	FileSystemBlobStore(Configuration config) throws IOException {
-		String storagePath = ConfigurationUtil.getStringWithDeprecatedKeys(
-				config,
-				ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH,
-				null,
-				ConfigConstants.ZOOKEEPER_RECOVERY_PATH);
-
-		if (storagePath == null) {
-			throw new IllegalConfigurationException(String.format("Missing configuration for " +
-					"ZooKeeper file system path. Please specify via " +
-					"'%s' key.", ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH));
+		String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
+
+		if (storagePath == null || StringUtils.isBlank(storagePath)) {
+			throw new IllegalConfigurationException("Missing high-availability storage path for metadata." +
+					" Specify via configuration key '" + HighAvailabilityOptions.HA_STORAGE_PATH + "'.");
 		}
 
 		this.basePath = storagePath + "/blob";

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
index 087ad3b..fa2db48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.ConfigurationUtil;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 
 /**
  * High availability mode for Flink's cluster execution. Currently supported modes are:
@@ -43,11 +43,7 @@ public enum HighAvailabilityMode {
 	 * configured.
 	 */
 	public static HighAvailabilityMode fromConfig(Configuration config) {
-		String haMode = ConfigurationUtil.getStringWithDeprecatedKeys(
-				config,
-				ConfigConstants.HA_MODE,
-				null,
-				ConfigConstants.RECOVERY_MODE);
+		String haMode = config.getValue(HighAvailabilityOptions.HA_MODE);
 
 		if (haMode == null) {
 			return HighAvailabilityMode.NONE;

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index be6611f..67dd78c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
@@ -182,9 +183,9 @@ public class SecurityContext {
 		//with pseudo JAAS configuration file if SASL auth is enabled for ZK
 		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
 
-		boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
-				ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
-		if(disableSaslClient) {
+		boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
+
+		if (disableSaslClient) {
 			LOG.info("SASL client auth for ZK will be disabled");
 			//SASL auth is disabled by default but will be enabled if specified in configuration
 			System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
@@ -212,8 +213,8 @@ public class SecurityContext {
 		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
 		System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
 
-		String zkSaslServiceName = configuration.getString(ConfigConstants.ZOOKEEPER_SASL_SERVICE_NAME, null);
-		if(!StringUtils.isBlank(zkSaslServiceName)) {
+		String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
+		if (!StringUtils.isBlank(zkSaslServiceName)) {
 			LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
 			System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME,zkSaslServiceName);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 5e69875..137a85b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.runtime.util;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -57,53 +59,25 @@ public class ZooKeeperUtils {
 	 * @return {@link CuratorFramework} instance
 	 */
 	public static CuratorFramework startCuratorFramework(Configuration configuration) {
-		String zkQuorum = ConfigurationUtil.getStringWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
-				null,
-				ConfigConstants.ZOOKEEPER_QUORUM_KEY);
+		String zkQuorum = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);
 
-		if (zkQuorum == null || zkQuorum.equals("")) {
+		if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) {
 			throw new RuntimeException("No valid ZooKeeper quorum has been specified. " +
 					"You can specify the quorum via the configuration key '" +
-					ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY + "'.");
+					HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM.key() + "'.");
 		}
 
-		int sessionTimeout = ConfigurationUtil.getIntegerWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT,
-				ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT,
-				ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT);
+		int sessionTimeout = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT);
 
-		int connectionTimeout = ConfigurationUtil.getIntegerWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT,
-				ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT,
-				ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT);
+		int connectionTimeout = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT);
 
-		int retryWait = ConfigurationUtil.getIntegerWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_RETRY_WAIT,
-				ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT,
-				ConfigConstants.ZOOKEEPER_RETRY_WAIT);
+		int retryWait = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_RETRY_WAIT);
 
-		int maxRetryAttempts = ConfigurationUtil.getIntegerWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
-				ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
-				ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS);
+		int maxRetryAttempts = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_MAX_RETRY_ATTEMPTS);
 
-		String root = ConfigurationUtil.getStringWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_DIR_KEY,
-				ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY,
-				ConfigConstants.ZOOKEEPER_DIR_KEY);
+		String root = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT);
 
-		String namespace = ConfigurationUtil.getStringWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
-				ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY,
-				ConfigConstants.ZOOKEEPER_NAMESPACE_KEY);
+		String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
 
 		String rootWithNamespace = generateZookeeperPath(root, namespace);
 
@@ -138,13 +112,9 @@ public class ZooKeeperUtils {
 	public static String getZooKeeperEnsemble(Configuration flinkConf)
 			throws IllegalConfigurationException {
 
-		String zkQuorum = ConfigurationUtil.getStringWithDeprecatedKeys(
-				flinkConf,
-				ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
-				"",
-				ConfigConstants.ZOOKEEPER_QUORUM_KEY);
+		String zkQuorum = flinkConf.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);
 
-		if (zkQuorum == null || zkQuorum.equals("")) {
+		if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) {
 			throw new IllegalConfigurationException("No ZooKeeper quorum specified in config.");
 		}
 
@@ -367,15 +337,11 @@ public class ZooKeeperUtils {
 			Configuration configuration,
 			String prefix) throws IOException {
 
-		String rootPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH,
-				"",
-				ConfigConstants.ZOOKEEPER_RECOVERY_PATH);
+		String rootPath = configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
 
-		if (rootPath.equals("")) {
-			throw new IllegalConfigurationException("Missing recovery path. Specify via " +
-				"configuration key '" + ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH + "'.");
+		if (rootPath == null || StringUtils.isBlank(rootPath)) {
+			throw new IllegalConfigurationException("Missing high-availability storage path for metadata." +
+					" Specify via configuration key '" + HighAvailabilityOptions.HA_STORAGE_PATH + "'.");
 		} else {
 			return new FileSystemStateStorageHelper<T>(rootPath, prefix);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
index 9fba529..c4140c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.zookeeper;
 
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
@@ -47,8 +47,25 @@ import java.util.UUID;
  */
 public class FlinkZooKeeperQuorumPeer {
 
+	/** ZooKeeper default client port. */
+	public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
+
+	/** ZooKeeper default init limit. */
+	public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
+
+	/** ZooKeeper default sync limit. */
+	public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
+
+	/** ZooKeeper default peer port. */
+	public static final int DEFAULT_ZOOKEEPER_PEER_PORT = 2888;
+
+	/** ZooKeeper default leader port. */
+	public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
+
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class);
 
+	// ------------------------------------------------------------------------
+
 	public static void main(String[] args) {
 		try {
 			// startup checks and logging
@@ -67,6 +84,8 @@ public class FlinkZooKeeperQuorumPeer {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Runs a ZooKeeper {@link QuorumPeer} if further peers are configured or a single
 	 * {@link ZooKeeperServer} if no further peers are configured.
@@ -120,26 +139,23 @@ public class FlinkZooKeeperQuorumPeer {
 	private static void setRequiredProperties(Properties zkProps) {
 		// Set default client port
 		if (zkProps.getProperty("clientPort") == null) {
-			int clientPort = ConfigConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
-			zkProps.setProperty("clientPort", String.valueOf(clientPort));
+			zkProps.setProperty("clientPort", String.valueOf(DEFAULT_ZOOKEEPER_CLIENT_PORT));
 
-			LOG.warn("No 'clientPort' configured. Set to '{}'.", clientPort);
+			LOG.warn("No 'clientPort' configured. Set to '{}'.", DEFAULT_ZOOKEEPER_CLIENT_PORT);
 		}
 
 		// Set default init limit
 		if (zkProps.getProperty("initLimit") == null) {
-			int initLimit = ConfigConstants.DEFAULT_ZOOKEEPER_INIT_LIMIT;
-			zkProps.setProperty("initLimit", String.valueOf(initLimit));
+			zkProps.setProperty("initLimit", String.valueOf(DEFAULT_ZOOKEEPER_INIT_LIMIT));
 
-			LOG.warn("No 'initLimit' configured. Set to '{}'.", initLimit);
+			LOG.warn("No 'initLimit' configured. Set to '{}'.", DEFAULT_ZOOKEEPER_INIT_LIMIT);
 		}
 
 		// Set default sync limit
 		if (zkProps.getProperty("syncLimit") == null) {
-			int syncLimit = ConfigConstants.DEFAULT_ZOOKEEPER_SYNC_LIMIT;
-			zkProps.setProperty("syncLimit", String.valueOf(syncLimit));
+			zkProps.setProperty("syncLimit", String.valueOf(DEFAULT_ZOOKEEPER_SYNC_LIMIT));
 
-			LOG.warn("No 'syncLimit' configured. Set to '{}'.", syncLimit);
+			LOG.warn("No 'syncLimit' configured. Set to '{}'.", DEFAULT_ZOOKEEPER_SYNC_LIMIT);
 		}
 
 		// Set default data dir
@@ -152,8 +168,8 @@ public class FlinkZooKeeperQuorumPeer {
 			LOG.warn("No 'dataDir' configured. Set to '{}'.", dataDir);
 		}
 
-		int peerPort = ConfigConstants.DEFAULT_ZOOKEEPER_PEER_PORT;
-		int leaderPort = ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PORT;
+		int peerPort = DEFAULT_ZOOKEEPER_PEER_PORT;
+		int leaderPort = DEFAULT_ZOOKEEPER_LEADER_PORT;
 
 		// Set peer and leader ports if none given, because ZooKeeper complains if multiple
 		// servers are configured, but no ports are given.
@@ -220,12 +236,8 @@ public class FlinkZooKeeperQuorumPeer {
 		
 		// Write myid to file. We use a File Writer, because that properly propagates errors,
 		// while the PrintWriter swallows errors
-		FileWriter writer = new FileWriter(new File(dataDir, "myid"));
-		try {
+		try (FileWriter writer = new FileWriter(new File(dataDir, "myid"))) {
 			writer.write(String.valueOf(id));
 		}
-		finally {
-			writer.close();
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e90f2d2..be820ae 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -31,7 +31,7 @@ import akka.pattern.ask
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.time.Time
-import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration, HighAvailabilityOptions}
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.metrics.{Gauge, MetricGroup}
@@ -2367,9 +2367,7 @@ object JobManager {
         configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
 
         // The port range of allowed job manager ports or 0 for random
-        configuration.getString(
-          ConfigConstants.RECOVERY_JOB_MANAGER_PORT,
-          ConfigConstants.DEFAULT_HA_JOB_MANAGER_PORT)
+        configuration.getValue(HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE)
       }
       else {
         LOG.info("Starting JobManager without high-availability")
@@ -2501,11 +2499,7 @@ object JobManager {
 
     val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
 
-    val jobRecoveryTimeoutStr = ConfigurationUtil.getStringWithDeprecatedKeys(
-      configuration,
-      ConfigConstants.HA_JOB_DELAY,
-      null,
-      ConfigConstants.RECOVERY_JOB_DELAY)
+    val jobRecoveryTimeoutStr = configuration.getValue(HighAvailabilityOptions.HA_JOB_DELAY)
 
     val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || jobRecoveryTimeoutStr.isEmpty) {
       timeout
@@ -2515,7 +2509,7 @@ object JobManager {
       } catch {
         case n: NumberFormatException =>
           throw new Exception(
-            s"Invalid config value for ${ConfigConstants.HA_JOB_DELAY}: " +
+            s"Invalid config value for ${HighAvailabilityOptions.HA_JOB_DELAY.key()}: " +
               s"$jobRecoveryTimeoutStr. Value must be a valid duration (such as '10 s' or '1 min')")
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 8464d68..8ba20c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.junit.After;
 import org.junit.Before;
@@ -68,9 +69,9 @@ public class BlobRecoveryITCase {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
+			config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-			config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, recoveryDir.getPath());
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath());
 
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index f6bed56..f6cdf09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.execution.librarycache;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -63,9 +64,9 @@ public class BlobLibraryCacheRecoveryITCase {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
+			config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-			config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
 
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
index 04c0e48..91fb514 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
@@ -20,7 +20,8 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -42,7 +43,7 @@ public class HighAvailabilityModeTest {
 		assertEquals(DEFAULT_HA_MODE, HighAvailabilityMode.fromConfig(config));
 
 		// Check not equals default
-		config.setString(ConfigConstants.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+		config.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
 		assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config));
 	}
 
@@ -54,16 +55,16 @@ public class HighAvailabilityModeTest {
 		Configuration config = new Configuration();
 
 		// Check mapping of old default to new default
-		config.setString(ConfigConstants.RECOVERY_MODE, ConfigConstants.DEFAULT_RECOVERY_MODE);
+		config.setString("recovery.mode", ConfigConstants.DEFAULT_RECOVERY_MODE);
 		assertEquals(DEFAULT_HA_MODE, HighAvailabilityMode.fromConfig(config));
 
 		// Check deprecated config
-		config.setString(ConfigConstants.RECOVERY_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+		config.setString("recovery.mode", HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
 		assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config));
 
 		// Check precedence over deprecated config
-		config.setString(ConfigConstants.HA_MODE, HighAvailabilityMode.NONE.name().toLowerCase());
-		config.setString(ConfigConstants.RECOVERY_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+		config.setString("high-availability", HighAvailabilityMode.NONE.name().toLowerCase());
+		config.setString("recovery.mode", HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
 
 		assertEquals(HighAvailabilityMode.NONE, HighAvailabilityMode.fromConfig(config));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 360588d..5b12eee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -131,8 +132,8 @@ public class JobManagerHARecoveryTest {
 		ActorRef jobManager = null;
 		ActorRef taskManager = null;
 
-		flinkConfiguration.setString(ConfigConstants.HA_MODE, "zookeeper");
-		flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, temporaryFolder.newFolder().toString());
+		flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+		flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
 		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index e20985b..1f1eb62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -27,6 +27,7 @@ import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
@@ -89,8 +90,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testZooKeeperLeaderElectionRetrieval() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -134,8 +135,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testZooKeeperReelection() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
 		Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
 
@@ -217,8 +218,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testZooKeeperReelectionWithReplacement() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
 		int num = 3;
 		int numTries = 30;
@@ -295,8 +296,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		final String leaderPath = "/leader";
 
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
@@ -379,8 +380,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testExceptionForwarding() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -448,8 +449,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testEphemeralZooKeeperNodes() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 0fe0644..70b1da0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -20,16 +20,19 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
-import org.apache.flink.configuration.ConfigConstants;
+
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+
 import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -82,8 +85,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
 		long sleepingTime = 1000;
 
-		config.setString(ConfigConstants.HA_MODE, "zookeeper");
-		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
 
 		LeaderElectionService leaderElectionService = null;
 		LeaderElectionService faultyLeaderElectionService;
@@ -179,8 +182,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 	@Test
 	public void testTimeoutOfFindConnectingAddress() throws Exception {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.HA_MODE, "zookeeper");
-		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
 
 		FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
 
@@ -190,7 +193,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 		assertEquals(InetAddress.getLocalHost(), result);
 	}
 
-	class FindConnectingAddress implements Runnable {
+	static class FindConnectingAddress implements Runnable {
 
 		private final Configuration config;
 		private final FiniteDuration timeout;

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 7dd7067..07cec32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 
@@ -66,8 +67,8 @@ public class ZooKeeperTestUtils {
 		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
 
 		// ZooKeeper recovery mode
-		config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
-		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum);
+		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperQuorum);
 
 		int connTimeout = 5000;
 		if (System.getenv().containsKey("CI")) {
@@ -75,20 +76,20 @@ public class ZooKeeperTestUtils {
 			connTimeout = 30000;
 		}
 
-		config.setInteger(ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
-		config.setInteger(ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
+		config.setInteger(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
+		config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
 
 		// File system state backend
 		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
-		config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, fsStateHandlePath + "/recovery");
+		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery");
 
 		// Akka failure detection and execution retries
 		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
 		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
 		config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
-		config.setString(ConfigConstants.HA_JOB_DELAY, "10 s");
+		config.setString(HighAvailabilityOptions.HA_JOB_DELAY, "10 s");
 
 		return config;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
index daed4a4..d5895ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.util;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -71,7 +72,7 @@ public class ZooKeeperUtilTest extends TestLogger {
 	}
 
 	private Configuration setQuorum(Configuration conf, String quorum) {
-		conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, quorum);
+		conf.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, quorum);
 		return conf;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
index bd58515..66c4fac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
@@ -22,9 +22,11 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.utils.ZKPaths;
-import org.apache.flink.configuration.ConfigConstants;
+
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+
 import org.apache.zookeeper.KeeperException;
 
 import java.util.List;
@@ -58,7 +60,7 @@ public class ZooKeeperTestEnvironment {
 				zooKeeperServer = new TestingServer(true);
 				zooKeeperCluster = null;
 
-				conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
+				conf.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
 						zooKeeperServer.getConnectString());
 			}
 			else {
@@ -67,7 +69,7 @@ public class ZooKeeperTestEnvironment {
 
 				zooKeeperCluster.start();
 
-				conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
+				conf.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
 						zooKeeperCluster.getConnectString());
 			}
 
@@ -127,7 +129,7 @@ public class ZooKeeperTestEnvironment {
 	 */
 	public CuratorFramework createClient() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, getConnectString());
+		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, getConnectString());
 		return ZooKeeperUtils.startCuratorFramework(config);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index e878097..97016e4 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -28,12 +28,12 @@ import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobExecutionResult
 
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{HighAvailabilityOptions, ConfigConstants, Configuration}
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.jobmanager.{HighAvailabilityMode, MemoryArchivist, JobManager}
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils
 import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, FlinkActor}
@@ -412,8 +412,7 @@ object TestingUtils {
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @param prefix The prefix to use for the Actor names
-    *
-    * @return
+   * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
@@ -422,7 +421,8 @@ object TestingUtils {
       prefix: String)
     : ActorGateway = {
 
-    configuration.setString(ConfigConstants.HA_MODE,
+    configuration.setString(
+      HighAvailabilityOptions.HA_MODE,
       ConfigConstants.DEFAULT_HA_MODE)
 
       val (actor, _) = JobManager.startJobManagerActors(
@@ -502,7 +502,8 @@ object TestingUtils {
       configuration: Configuration)
   : ActorGateway = {
 
-    configuration.setString(ConfigConstants.HA_MODE,
+    configuration.setString(
+      HighAvailabilityOptions.HA_MODE,
       ConfigConstants.DEFAULT_HA_MODE)
 
     val actor = FlinkResourceManager.startResourceManagerActors(

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 051175a..c005814 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SecureTestEnvironment;
@@ -215,10 +216,10 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
 			config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-			config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 			config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
 			config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
-			config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, hdfsURI + "/flink/recovery");
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
 			config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
 
 			SecureTestEnvironment.populateFlinkSecureConfigurations(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index b5e622b..0250c16 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.junit.rules.TemporaryFolder;
@@ -115,7 +116,7 @@ public class SecureTestEnvironment {
 			Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
 			flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
 			flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
-			flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false);
+			flinkConfig.setBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE, false);
 			ctx.setFlinkConfiguration(flinkConfig);
 			TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index b774f97..aa5e7d3 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
@@ -121,7 +122,7 @@ public class TestBaseUtils extends TestLogger {
 
 		if (startZooKeeper) {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-			config.setString(ConfigConstants.HA_MODE, "zookeeper");
+			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 		}
 
 		return startCluster(config, singleActorSystem);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index cc8ab80..4d10bf1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -564,7 +565,7 @@ public class ChaosMonkeyITCase extends TestLogger {
 			fail(fsCheckpoints + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles()));
 		}
 
-		File fsRecovery = new File(new URI(config.getString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, "")).getPath());
+		File fsRecovery = new File(new URI(config.getString(HighAvailabilityOptions.HA_STORAGE_PATH)).getPath());
 
 		LOG.info("Checking " + fsRecovery);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 9b0d9de..a51f88b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -149,8 +150,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 	 */
 	public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
-		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zkQuorum);
+		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum);
 
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"leader", 1, config);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 48ad7f5..4bcde16 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn;
 
 import org.apache.commons.cli.CommandLine;
+
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
@@ -27,7 +28,7 @@ import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -38,20 +39,20 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+
 import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
-import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
@@ -202,7 +203,7 @@ public class CliFrontendYarnAddressConfigurationTest {
 				CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
 
 		frontend.retrieveClient(options);
-		String zkNs = frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, "error");
+		String zkNs = frontend.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
 		Assert.assertTrue(zkNs.matches("application_\\d+_0042"));
 	}
 
@@ -216,7 +217,7 @@ public class CliFrontendYarnAddressConfigurationTest {
 				CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace});
 
 		frontend.retrieveClient(options);
-		String zkNs = frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, "error");
+		String zkNs = frontend.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
 		Assert.assertEquals(overrideZkNamespace, zkNs);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 9d6ff85..79f790f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -119,7 +120,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
 			"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
 			"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
-			"@@" + ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH + "=" + fsStateHandlePath + "/recovery");
+			"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
 		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
 
 		ClusterClient yarnCluster = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 848013c..9481c24 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.security.SecurityContext;
@@ -539,11 +540,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// no user specified cli argument for namespace?
 		if (zkNamespace == null || zkNamespace.isEmpty()) {
 			// namespace defined in config? else use applicationId as default.
-			zkNamespace = flinkConfiguration.getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, String.valueOf(appId));
+			zkNamespace = flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
 			setZookeeperNamespace(zkNamespace);
 		}
 
-		flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+		flinkConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
 
 		if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
 			// activate re-execution of failed applications

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index b27876b..10e229e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -25,6 +25,7 @@ import akka.actor.Props;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -472,7 +473,7 @@ public class YarnApplicationMasterRunner {
 		// override zookeeper namespace with user cli argument (if provided)
 		String cliZKNamespace = ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
 		if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) {
-			configuration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, cliZKNamespace);
+			configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, cliZKNamespace);
 		}
 
 		// if a web monitor shall be started, set the port to random binding

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index d09340c..e4da140 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -29,6 +29,7 @@ import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
@@ -60,7 +61,6 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
-import static org.apache.flink.configuration.ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY;
 
 /**
  * Class handling the command line interface to the YARN session.
@@ -513,8 +513,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		if(null != applicationID) {
 			String zkNamespace = cmdLine.hasOption(ZOOKEEPER_NAMESPACE.getOpt()) ?
 					cmdLine.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt())
-					: config.getString(HA_ZOOKEEPER_NAMESPACE_KEY, applicationID);
-			config.setString(HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+					: config.getString(HighAvailabilityOptions.HA_CLUSTER_ID, applicationID);
+			config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
 
 			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
 			yarnDescriptor.setFlinkConfiguration(config);


Mime
View raw message