flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [01/16] flink git commit: [FLINK-6498] Migrate Zookeeper configuration options
Date Sat, 01 Jul 2017 10:06:34 GMT
Repository: flink
Updated Branches:
  refs/heads/master daed46002 -> 6e3f839ac


[FLINK-6498] Migrate Zookeeper configuration options

This closes #4123.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8390181
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8390181
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8390181

Branch: refs/heads/master
Commit: f839018131024860a1b25b13cea7e1313add28d5
Parents: 3d2f3f6
Author: zjureel <zjureel@gmail.com>
Authored: Wed Jun 14 17:38:25 2017 +0800
Committer: zentol <chesnay@apache.org>
Committed: Sat Jul 1 10:02:07 2017 +0200

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkSecuredITCase.java |   2 +-
 .../flink/configuration/ConfigConstants.java    | 124 +++++++++++++++++--
 .../configuration/HighAvailabilityOptions.java  |  60 +++++++--
 .../services/MesosServicesUtils.java            |  10 +-
 .../flink/runtime/util/ZooKeeperUtils.java      |  53 +++-----
 .../ZooKeeperLeaderElectionTest.java            |   5 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |   7 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   6 +-
 8 files changed, 191 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 6bd75d4..866b2f3 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -218,7 +218,7 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
 			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
-			config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
+			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
 			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
 			config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 35d3d13..d467dfa 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -944,65 +944,111 @@ public final class ConfigConstants {
 
 	// --------------------------- ZooKeeper ----------------------------------
 
-	/** ZooKeeper servers. */
+	/**
+	 * ZooKeeper servers.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_QUORUM}.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_QUORUM_KEY = "high-availability.zookeeper.quorum";
 
 	/**
 	 * File system state backend base path for recoverable state handles. Recovery state is
written
 	 * to this path and the file state handles are persisted for recovery.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_STORAGE_PATH}.
 	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_STORAGE_PATH = "high-availability.zookeeper.storageDir";
 
-	/** ZooKeeper root path. */
+	/**
+	 * ZooKeeper root path.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_DIR_KEY = "high-availability.zookeeper.path.root";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_NAMESPACE_KEY = "high-availability.zookeeper.path.namespace";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_LATCH_PATH = "high-availability.zookeeper.path.latch";
 
-	/** ZooKeeper root path (ZNode) for job graphs. */
+	/**
+	 * ZooKeeper root path (ZNode) for job graphs.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH = "high-availability.zookeeper.path.jobgraphs";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader";
 
-	/** ZooKeeper root path (ZNode) for completed checkpoints. */
+	/**
+	 * ZooKeeper root path (ZNode) for completed checkpoints.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH = "high-availability.zookeeper.path.checkpoints";
 
-	/** ZooKeeper root path (ZNode) for checkpoint counters. */
+	/**
+	 * ZooKeeper root path (ZNode) for checkpoint counters.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "high-availability.zookeeper.path.checkpoint-counter";
 
-	/** ZooKeeper root path (ZNode) for Mesos workers. */
+	/**
+	 * ZooKeeper root path (ZNode) for Mesos workers.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_MESOS_WORKERS_PATH}.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_MESOS_WORKERS_PATH = "high-availability.zookeeper.path.mesos-workers";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_SESSION_TIMEOUT}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_SESSION_TIMEOUT = "high-availability.zookeeper.client.session-timeout";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CONNECTION_TIMEOUT}.
*/
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_CONNECTION_TIMEOUT = "high-availability.zookeeper.client.connection-timeout";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_RETRY_WAIT} */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_RETRY_WAIT = "high-availability.zookeeper.client.retry-wait";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_MAX_RETRY_ATTEMPTS}.
*/
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = "high-availability.zookeeper.client.max-retry-attempts";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_CLIENT_ACL = "high-availability.zookeeper.client.acl";
 
+	/** @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_DISABLE}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String ZOOKEEPER_SASL_DISABLE = "zookeeper.sasl.disable";
 
+	/** @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_SERVICE_NAME}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String ZOOKEEPER_SASL_SERVICE_NAME = "zookeeper.sasl.service-name";
 
 	/** @deprecated Deprecated in favour of {@link #HA_ZOOKEEPER_QUORUM_KEY}. */
@@ -1632,51 +1678,103 @@ public final class ConfigConstants {
 
 	// --------------------------- ZooKeeper ----------------------------------
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}. */
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_DIR_KEY = "/flink";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_NAMESPACE_KEY = "/default";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = "/leaderlatch";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}.
*/
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}.
*/
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}
*/
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_MESOS_WORKERS_PATH}.
*/
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH = "/mesos-workers";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_SESSION_TIMEOUT}. */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000;
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CONNECTION_TIMEOUT}.
*/
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000;
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_RETRY_WAIT}. */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_RETRY_WAIT = 5000;
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_MAX_RETRY_ATTEMPTS}.
*/
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS = 3;
 
 	// - Defaults for required ZooKeeper configuration keys -------------------
 
-	/** ZooKeeper default client port. */
+	/**
+	 * ZooKeeper default client port.
+	 * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_CLIENT_PORT}.
+	 */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
 
-	/** ZooKeeper default init limit. */
+	/**
+	 * ZooKeeper default init limit.
+	 * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_INIT_LIMIT}.
+	 */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
 
-	/** ZooKeeper default sync limit. */
+	/**
+	 * ZooKeeper default sync limit.
+	 * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_SYNC_LIMIT}.
+	 */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
 
-	/** ZooKeeper default peer port. */
+	/**
+	 * ZooKeeper default peer port.
+	 * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_PEER_PORT}.
+	 */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_PEER_PORT = 2888;
 
-	/** ZooKeeper default leader port. */
+	/**
+	 * ZooKeeper default leader port.
+	 * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_LEADER_PORT}.
+	 */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
 
-	/** Defaults for ZK client security **/
+	/**
+	 * Defaults for ZK client security.
+	 * @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_DISABLE}.
+	 */
+	@Deprecated
 	public static final boolean DEFAULT_ZOOKEEPER_SASL_DISABLE = true;
 
-	/** ACL options supported "creator" or "open" */
+	/**
+	 * ACL options supported "creator" or "open".
+	 * @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL}.
+	 */
+	@Deprecated
 	public static final String DEFAULT_HA_ZOOKEEPER_CLIENT_ACL = "open";
 
 	// ----------------------------- Metrics ----------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 2cfb25a..2b026b9 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -58,14 +58,6 @@ public class HighAvailabilityOptions {
 			key("high-availability.storageDir")
 			.noDefaultValue()
 			.withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir");
-
-	/**
-	 * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
-	 */
-	public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
-			key("high-availability.zookeeper.quorum")
-			.noDefaultValue()
-			.withDeprecatedKeys("recovery.zookeeper.quorum");
 	
 
 	// ------------------------------------------------------------------------
@@ -93,6 +85,14 @@ public class HighAvailabilityOptions {
 	// ------------------------------------------------------------------------
 
 	/**
+	 * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
+	 */
+	public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
+			key("high-availability.zookeeper.quorum")
+			.noDefaultValue()
+			.withDeprecatedKeys("recovery.zookeeper.quorum");
+
+	/**
 	 * The root path under which Flink stores its entries in ZooKeeper
 	 */
 	public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =
@@ -100,6 +100,46 @@ public class HighAvailabilityOptions {
 			.defaultValue("/flink")
 			.withDeprecatedKeys("recovery.zookeeper.path.root");
 
+	public static final ConfigOption<String> HA_ZOOKEEPER_NAMESPACE =
+			key("high-availability.zookeeper.path.namespace")
+			.noDefaultValue()
+			.withDeprecatedKeys("recovery.zookeeper.path.namespace");
+
+	public static final ConfigOption<String> HA_ZOOKEEPER_LATCH_PATH =
+			key("high-availability.zookeeper.path.latch")
+			.defaultValue("/leaderlatch")
+			.withDeprecatedKeys("recovery.zookeeper.path.latch");
+
+	/** ZooKeeper root path (ZNode) for job graphs. */
+	public static final ConfigOption<String> HA_ZOOKEEPER_JOBGRAPHS_PATH =
+			key("high-availability.zookeeper.path.jobgraphs")
+			.defaultValue("/jobgraphs")
+			.withDeprecatedKeys("recovery.zookeeper.path.jobgraphs");
+
+	public static final ConfigOption<String> HA_ZOOKEEPER_LEADER_PATH =
+			key("high-availability.zookeeper.path.leader")
+			.defaultValue("/leader")
+			.withDeprecatedKeys("recovery.zookeeper.path.leader");
+
+	/** ZooKeeper root path (ZNode) for completed checkpoints. */
+	public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINTS_PATH =
+			key("high-availability.zookeeper.path.checkpoints")
+			.defaultValue("/checkpoints")
+			.withDeprecatedKeys("recovery.zookeeper.path.checkpoints");
+
+	/** ZooKeeper root path (ZNode) for checkpoint counters. */
+	public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
+			key("high-availability.zookeeper.path.checkpoint-counter")
+			.defaultValue("/checkpoint-counter")
+			.withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter");
+
+	/** ZooKeeper root path (ZNode) for Mesos workers. */
+	@PublicEvolving
+	public static final ConfigOption<String> HA_ZOOKEEPER_MESOS_WORKERS_PATH =
+			key("high-availability.zookeeper.path.mesos-workers")
+			.defaultValue("/mesos-workers")
+			.withDeprecatedKeys("recovery.zookeeper.path.mesos-workers");
+
 	// ------------------------------------------------------------------------
 	//  ZooKeeper Client Settings
 	// ------------------------------------------------------------------------
@@ -128,6 +168,10 @@ public class HighAvailabilityOptions {
 			key("high-availability.zookeeper.path.running-registry")
 			.defaultValue("/running_job_registry/");
 
+	public static final ConfigOption<String> ZOOKEEPER_CLIENT_ACL =
+			key("high-availability.zookeeper.client.acl")
+			.defaultValue("open");
+
 	// ------------------------------------------------------------------------
 
 	/** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
index a28020a..370a760 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.mesos.runtime.clusterframework.services;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
-import org.apache.flink.util.ConfigurationUtil;
 
 /**
  * Utilities for the {@link MesosServices}.
@@ -44,11 +43,8 @@ public class MesosServicesUtils {
 				return new StandaloneMesosServices();
 
 			case ZOOKEEPER:
-				final String zkMesosRootPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-					configuration,
-					ConfigConstants.HA_ZOOKEEPER_MESOS_WORKERS_PATH,
-					ConfigConstants.DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH,
-					ConfigConstants.ZOOKEEPER_MESOS_WORKERS_PATH);
+				final String zkMesosRootPath = configuration.getString(
+					HighAvailabilityOptions.HA_ZOOKEEPER_MESOS_WORKERS_PATH);
 
 				ZooKeeperUtilityFactory zooKeeperUtilityFactory = new ZooKeeperUtilityFactory(
 					configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 9ade5ec..a7ac500 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -25,10 +25,10 @@ import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.imps.DefaultACLProvider;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
-import org.apache.flink.util.ConfigurationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
@@ -87,8 +86,7 @@ public class ZooKeeperUtils {
 
 		String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
 
-		boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
-				ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
+		boolean disableSaslClient = configuration.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
 
 		ACLProvider aclProvider;
 
@@ -96,7 +94,7 @@ public class ZooKeeperUtils {
 
 		if(disableSaslClient && aclMode == ZkClientACLMode.CREATOR) {
 			String errorMessage = "Cannot set ACL role to " + aclMode +"  since SASL authentication
is " +
-					"disabled through the " + ConfigConstants.ZOOKEEPER_SASL_DISABLE + " property";
+					"disabled through the " + SecurityOptions.ZOOKEEPER_SASL_DISABLE.key() + " property";
 			LOG.warn(errorMessage);
 			throw new IllegalConfigurationException(errorMessage);
 		}
@@ -185,11 +183,8 @@ public class ZooKeeperUtils {
 		final Configuration configuration,
 		final String pathSuffix)
 	{
-		String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-			configuration,
-			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
-			ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix;
+		String leaderPath = configuration.getString(
+			HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
 
 		return new ZooKeeperLeaderRetrievalService(client, leaderPath);
 	}
@@ -221,16 +216,10 @@ public class ZooKeeperUtils {
 		final Configuration configuration,
 		final String pathSuffix)
 	{
-		final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-			configuration,
-			ConfigConstants.HA_ZOOKEEPER_LATCH_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH,
-			ConfigConstants.ZOOKEEPER_LATCH_PATH) + pathSuffix;
-		final String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-			configuration,
-			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
-			ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix;
+		final String latchPath = configuration.getString(
+			HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix;
+		final String leaderPath = configuration.getString(
+			HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
 
 		return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
 	}
@@ -254,11 +243,7 @@ public class ZooKeeperUtils {
 		RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration,
"submittedJobGraph");
 
 		// ZooKeeper submitted jobs root dir
-		String zooKeeperSubmittedJobsPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH,
-				ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH);
+		String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
 				client, zooKeeperSubmittedJobsPath, stateStorage, executor);
@@ -284,11 +269,8 @@ public class ZooKeeperUtils {
 
 		checkNotNull(configuration, "Configuration");
 
-		String checkpointsPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_CHECKPOINTS_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH,
-				ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH);
+		String checkpointsPath = configuration.getString(
+			HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH);
 
 		RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = createFileSystemStateStorage(
 			configuration,
@@ -317,11 +299,8 @@ public class ZooKeeperUtils {
 			Configuration configuration,
 			JobID jobId) {
 
-		String checkpointIdCounterPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
-				ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+		String checkpointIdCounterPath = configuration.getString(
+				HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
 
 		checkpointIdCounterPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
 
@@ -391,11 +370,11 @@ public class ZooKeeperUtils {
 		 * Return the configured {@link ZkClientACLMode}.
 		 *
 		 * @param config The config to parse
-		 * @return Configured ACL mode or {@link ConfigConstants#DEFAULT_HA_ZOOKEEPER_CLIENT_ACL}
if not
+		 * @return Configured ACL mode or the default defined by {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL}
if not
 		 * configured.
 		 */
 		public static ZkClientACLMode fromConfig(Configuration config) {
-			String aclMode = config.getString(ConfigConstants.HA_ZOOKEEPER_CLIENT_ACL, null);
+			String aclMode = config.getString(HighAvailabilityOptions.ZOOKEEPER_CLIENT_ACL);
 			if (aclMode == null || aclMode.equalsIgnoreCase(ZkClientACLMode.OPEN.name())) {
 				return ZkClientACLMode.OPEN;
 			} else if (aclMode.equalsIgnoreCase(ZkClientACLMode.CREATOR.name())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 6efd270..73cf063 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -298,7 +298,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		final String FAULTY_CONTENDER_URL = "faultyContender";
 		final String leaderPath = "/leader";
 
-		configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -463,8 +463,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			testingContender = new TestingContender(TEST_URL, leaderElectionService);
 			listener = new TestingListener();
 
-			final String leaderPath = configuration.getString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
-					ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
+			final String leaderPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH);
 			cache = new NodeCache(client2, leaderPath);
 
 			ExistsCacheListener existsListener = new ExistsCacheListener(cache);

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index 80b8e18..3f2eea3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -392,8 +393,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 
 		// ZooKeeper
 		String currentJobsPath = config.getString(
-				ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+				HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
 
@@ -424,8 +424,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 
 		// ZooKeeper
 		String currentJobsPath = config.getString(
-			ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+			HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index f15314a..73279da 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -66,7 +66,7 @@ import java.util.Properties;
 import java.util.concurrent.Callable;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
-import static org.apache.flink.configuration.ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY;
+import static org.apache.flink.configuration.HighAvailabilityOptions.HA_ZOOKEEPER_NAMESPACE;
 
 /**
  * Class handling the command line interface to the YARN session.
@@ -597,9 +597,9 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ?
 									cmd.getOptionValue(zookeeperNamespace.getOpt())
 									: yarnDescriptor.getFlinkConfiguration()
-									.getString(HA_ZOOKEEPER_NAMESPACE_KEY, cmd.getOptionValue(applicationId.getOpt()));
+									.getString(HA_ZOOKEEPER_NAMESPACE, cmd.getOptionValue(applicationId.getOpt()));
 			LOG.info("Going to use the ZK namespace: {}", zkNamespace);
-			yarnDescriptor.getFlinkConfiguration().setString(HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+			yarnDescriptor.getFlinkConfiguration().setString(HA_ZOOKEEPER_NAMESPACE, zkNamespace);
 
 			try {
 				yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));


Mime
View raw message