flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/4] flink git commit: [FLINK-4253] [config] Rename 'recovery.mode' key to 'high-availability'
Date Wed, 24 Aug 2016 10:11:55 GMT
Repository: flink
Updated Branches:
  refs/heads/master 720645587 -> 58165d69f


http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 1c0290a..048fbee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -89,8 +89,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testZooKeeperLeaderElectionRetrieval() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -134,8 +134,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testZooKeeperReelection() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 
 		Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
 
@@ -217,8 +217,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testZooKeeperReelectionWithReplacement() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 
 		int num = 3;
 		int numTries = 30;
@@ -295,9 +295,9 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		final String leaderPath = "/leader";
 
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		configuration.setString(ConfigConstants.ZOOKEEPER_LEADER_PATH, leaderPath);
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -379,8 +379,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testExceptionForwarding() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -448,8 +448,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testEphemeralZooKeeperNodes() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -466,7 +466,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			listener = new TestingListener();
 
 			client = ZooKeeperUtils.startCuratorFramework(configuration);
-			final String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
+			final String leaderPath = configuration.getString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
 					ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
 			cache = new NodeCache(client, leaderPath);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index aae1840..5aace34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -23,6 +23,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -43,6 +44,7 @@ import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
@@ -82,8 +84,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
 		long sleepingTime = 1000;
 
-		config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
 
 		LeaderElectionService leaderElectionService = null;
 		LeaderElectionService faultyLeaderElectionService;
@@ -179,8 +181,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 	@Test
 	public void testTimeoutOfFindConnectingAddress() throws Exception {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
 
 		FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
 
@@ -190,6 +192,46 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 		assertEquals(InetAddress.getLocalHost(), result);
 	}
 
+	@Test
+	public void testConnectionToZookeeperOverridingOldConfig() throws Exception {
+		Configuration config = new Configuration();
+		// The new config will be taken into effect
+		config.setString(ConfigConstants.RECOVERY_MODE, "standalone");
+		config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+
+		FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+
+		LeaderRetrievalService leaderRetrievalService =
+			LeaderRetrievalUtils.createLeaderRetrievalService(config);
+		InetAddress result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService,
timeout);
+
+		assertEquals(InetAddress.getLocalHost(), result);
+	}
+
+	@Test
+	public void testConnectionToStandAloneLeaderOverridingOldConfig() throws Exception {
+		Configuration config = new Configuration();
+		// The new config will be taken into effect
+		config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		config.setString(ConfigConstants.HIGH_AVAILABILITY, "none");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+
+		HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(config);
+		assertTrue(mode == HighAvailabilityMode.NONE);
+	}
+
+	@Test
+	public void testConnectionToZookeeperUsingOldConfig() throws Exception {
+		Configuration config = new Configuration();
+		// The new config will be taken into effect
+		config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+
+		HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(config);
+		assertTrue(mode == HighAvailabilityMode.ZOOKEEPER);
+	}
+
 	class FindConnectingAddress implements Runnable {
 
 		private final Configuration config;

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
index fac6162..66d523f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -212,7 +212,7 @@ public class JobManagerProcess extends TestJvmProcess {
 		 * <code>--port PORT</code>.
 		 *
 		 * <p>Other arguments are parsed to a {@link Configuration} and passed to the
-		 * JobManager, for instance: <code>--recovery.mode ZOOKEEPER --recovery.zookeeper.quorum
+		 * JobManager, for instance: <code>--high-availability ZOOKEEPER --recovery.zookeeper.quorum
 		 * "xyz:123:456"</code>.
 		 */
 		public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
index 97e7cca..417dc88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
@@ -96,7 +96,7 @@ public class TaskManagerProcess extends TestJvmProcess {
 
 		/**
 		 * All arguments are parsed to a {@link Configuration} and passed to the Taskmanager,
-		 * for instance: <code>--recovery.mode ZOOKEEPER --recovery.zookeeper.quorum "xyz:123:456"</code>.
+		 * for instance: <code>--high-availability ZOOKEEPER --recovery.zookeeper.quorum
"xyz:123:456"</code>.
 		 */
 		public static void main(String[] args) throws Exception {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 2796337..c94842f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -31,12 +31,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class ZooKeeperTestUtils {
 
 	/**
-	 * Creates a configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+	 * Creates a configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 	 *
 	 * @param zooKeeperQuorum   ZooKeeper quorum to connect to
 	 * @param fsStateHandlePath Base path for file system state backend (for checkpoints and
 	 *                          recovery)
-	 * @return A new configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+	 * @return A new configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 	 */
 	public static Configuration createZooKeeperRecoveryModeConfig(
 			String zooKeeperQuorum, String fsStateHandlePath) {
@@ -45,13 +45,13 @@ public class ZooKeeperTestUtils {
 	}
 
 	/**
-	 * Sets all necessary configuration keys to operate in {@link RecoveryMode#ZOOKEEPER}.
+	 * Sets all necessary configuration keys to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 	 *
 	 * @param config            Configuration to use
 	 * @param zooKeeperQuorum   ZooKeeper quorum to connect to
 	 * @param fsStateHandlePath Base path for file system state backend (for checkpoints and
 	 *                          recovery)
-	 * @return The modified configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+	 * @return The modified configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 	 */
 	public static Configuration setZooKeeperRecoveryMode(
 			Configuration config,
@@ -66,8 +66,8 @@ public class ZooKeeperTestUtils {
 		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
 
 		// ZooKeeper recovery mode
-		config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
-		config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum);
+		config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum);
 
 		int connTimeout = 5000;
 		if (System.getenv().containsKey("CI")) {
@@ -75,20 +75,20 @@ public class ZooKeeperTestUtils {
 			connTimeout = 30000;
 		}
 
-		config.setInteger(ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
-		config.setInteger(ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
+		config.setInteger(ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
+		config.setInteger(ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
 
 		// File system state backend
 		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath
+ "/checkpoints");
-		config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, fsStateHandlePath + "/recovery");
+		config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, fsStateHandlePath + "/recovery");
 
 		// Akka failure detection and execution retries
 		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
 		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
 		config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
-		config.setString(ConfigConstants.RECOVERY_JOB_DELAY, "10 s");
+		config.setString(ConfigConstants.HA_JOB_DELAY, "10 s");
 
 		return config;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
index 0d01f65..daed4a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
@@ -71,7 +71,7 @@ public class ZooKeeperUtilTest extends TestLogger {
 	}
 
 	private Configuration setQuorum(Configuration conf, String quorum) {
-		conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, quorum);
+		conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, quorum);
 		return conf;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
index 8fc80e0..467706f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
@@ -58,7 +58,7 @@ public class ZooKeeperTestEnvironment {
 				zooKeeperServer = new TestingServer(true);
 				zooKeeperCluster = null;
 
-				conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+				conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
 						zooKeeperServer.getConnectString());
 			}
 			else {
@@ -67,7 +67,7 @@ public class ZooKeeperTestEnvironment {
 
 				zooKeeperCluster.start();
 
-				conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+				conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
 						zooKeeperCluster.getConnectString());
 			}
 
@@ -127,7 +127,7 @@ public class ZooKeeperTestEnvironment {
 	 */
 	public CuratorFramework createClient() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, getConnectString());
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, getConnectString());
 		return ZooKeeperUtils.startCuratorFramework(config);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 02a0fec..7d2b86c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -423,7 +423,8 @@ object TestingUtils {
       prefix: String)
     : ActorGateway = {
 
-    configuration.setString(ConfigConstants.RECOVERY_MODE, ConfigConstants.DEFAULT_RECOVERY_MODE)
+    configuration.setString(ConfigConstants.HIGH_AVAILABILITY,
+      ConfigConstants.DEFAULT_HIGH_AVAILABILTY)
 
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
@@ -502,7 +503,8 @@ object TestingUtils {
       configuration: Configuration)
   : ActorGateway = {
 
-    configuration.setString(ConfigConstants.RECOVERY_MODE, ConfigConstants.DEFAULT_RECOVERY_MODE)
+    configuration.setString(ConfigConstants.HIGH_AVAILABILITY,
+      ConfigConstants.DEFAULT_HIGH_AVAILABILTY)
 
     val actor = FlinkResourceManager.startResourceManagerActors(
       configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index c51e666..7e5acee 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -120,7 +120,7 @@ public class TestBaseUtils extends TestLogger {
 
 		if (startZooKeeper) {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-			config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+			config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 		}
 
 		return startCluster(config, singleActorSystem);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 42c0a6a..5dd4188 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -29,7 +29,7 @@ import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
+import org.apache.flink.runtime.jobmanager.{JobManager, HighAvailabilityMode}
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
@@ -261,14 +261,16 @@ class ForkableFlinkMiniCluster(
   }
 
   override def start(): Unit = {
-    val zookeeperURL = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "")
+    val zookeeperURL = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "")
 
-    zookeeperCluster = if(recoveryMode == RecoveryMode.ZOOKEEPER && zookeeperURL.equals(""))
{
+    zookeeperCluster = if (recoveryMode == HighAvailabilityMode.ZOOKEEPER &&
+      zookeeperURL.equals("")) {
       LOG.info("Starting ZooKeeper cluster.")
 
       val testingCluster = new TestingCluster(1)
 
-      configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString)
+      configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
+        testingCluster.getConnectString)
 
       testingCluster.start()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index e97532c..22bf62a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -564,7 +564,7 @@ public class ChaosMonkeyITCase extends TestLogger {
 			fail(fsCheckpoints + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles()));
 		}
 
-		File fsRecovery = new File(new URI(config.getString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH,
"")).getPath());
+		File fsRecovery = new File(new URI(config.getString(ConfigConstants.ZOOKEEPER_HA_PATH,
"")).getPath());
 
 		LOG.info("Checking " + fsRecovery);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index eccf971..e0e165d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -482,7 +482,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 
 		// ZooKeeper
 		String currentJobsPath = config.getString(
-				ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+				ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
 				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
@@ -514,7 +514,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 
 		// ZooKeeper
 		String currentJobsPath = config.getString(
-			ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+			ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
 			ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 88aeb09..0c52204 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -149,8 +149,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger
{
 	 */
 	public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception
{
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
-		config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zkQuorum);
+		config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zkQuorum);
 
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"leader", 1, config);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 45ee839..7091339 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -91,11 +91,11 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 		int numJMs = 10;
 		int numTMs = 3;
 
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
+		configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
 
 		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
 
@@ -139,12 +139,12 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
 		Configuration configuration = new Configuration();
 
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
 		configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
+		configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
 
 		// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to
make
 		// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 60ae2ef..48ad7f5 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -202,7 +202,7 @@ public class CliFrontendYarnAddressConfigurationTest {
 				CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
 
 		frontend.retrieveClient(options);
-		String zkNs = frontend.getConfiguration().getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY,
"error");
+		String zkNs = frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
"error");
 		Assert.assertTrue(zkNs.matches("application_\\d+_0042"));
 	}
 
@@ -216,7 +216,7 @@ public class CliFrontendYarnAddressConfigurationTest {
 				CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString(),
"-yz", overrideZkNamespace});
 
 		frontend.retrieveClient(options);
-		String zkNs = frontend.getConfiguration().getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY,
"error");
+		String zkNs = frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
"error");
 		Assert.assertEquals(overrideZkNamespace, zkNs);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 9b52975..25dbe53 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -115,7 +115,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts
+
 			"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
 			"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath
+ "/checkpoints" +
-			"@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery");
+			"@@" + ConfigConstants.ZOOKEEPER_HA_PATH + "=" + fsStateHandlePath + "/recovery");
 		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
 
 		ClusterClient yarnCluster = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ba07af1..f4c2032 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -23,7 +23,7 @@ import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -553,13 +553,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// no user specified cli argument for namespace?
 		if (zkNamespace == null || zkNamespace.isEmpty()) {
 			// namespace defined in config? else use applicationId as default.
-			zkNamespace = flinkConfiguration.getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, String.valueOf(appId));
+			zkNamespace = flinkConfiguration.getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
String.valueOf(appId));
 			setZookeeperNamespace(zkNamespace);
 		}
 
-		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+		flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
 
-		if (RecoveryMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
+		if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
 			// activate re-execution of failed applications
 			appContext.setMaxAppAttempts(
 				flinkConfiguration.getInteger(

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 39b2510..87a2c98 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -430,7 +430,7 @@ public class YarnApplicationMasterRunner {
 		// override zookeeper namespace with user cli argument (if provided)
 		String cliZKNamespace = ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
 		if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) {
-			configuration.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, cliZKNamespace);
+			configuration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, cliZKNamespace);
 		}
 
 		// if a web monitor shall be started, set the port to random binding

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index bee6a7a..3c93e34 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -58,7 +58,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
-import static org.apache.flink.configuration.ConfigConstants.ZOOKEEPER_NAMESPACE_KEY;
+import static org.apache.flink.configuration.ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY;
 
 /**
  * Class handling the command line interface to the YARN session.
@@ -503,8 +503,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		if(null != applicationID) {
 			String zkNamespace = cmdLine.hasOption(ZOOKEEPER_NAMESPACE.getOpt()) ?
 					cmdLine.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt())
-					: config.getString(ZOOKEEPER_NAMESPACE_KEY, applicationID);
-			config.setString(ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+					: config.getString(HA_ZOOKEEPER_NAMESPACE_KEY, applicationID);
+			config.setString(HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
 
 			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
 			yarnDescriptor.setFlinkConfiguration(config);


Mime
View raw message