flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [05/16] flink git commit: [FLINK-6078] Remove CuratorFramework#close calls from ZooKeeper based HA services
Date Fri, 05 May 2017 11:48:10 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 5ba651f..9d2806c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -31,8 +31,11 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -70,12 +73,13 @@ import static org.junit.Assert.fail;
 public class ProcessFailureCancelingITCase extends TestLogger {
 	
 	@Test
-	public void testCancelingOnProcessFailure() {
+	public void testCancelingOnProcessFailure() throws Exception {
 		final StringWriter processOutput = new StringWriter();
 
 		ActorSystem jmActorSystem = null;
 		Process taskManagerProcess = null;
-		
+		HighAvailabilityServices highAvailabilityServices = null;
+
 		try {
 			// check that we run this test only if the java command
 			// is available on this machine
@@ -101,6 +105,13 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2000 s");
 			jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 10);
 			jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
+			jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
+			jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+				jmConfig,
+				TestingUtils.defaultExecutor(),
+				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
 			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress));
 			ActorRef jmActor = JobManager.startJobManagerActors(
@@ -108,6 +119,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 				jmActorSystem,
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
+				highAvailabilityServices,
 				JobManager.class,
 				MemoryArchivist.class)._1();
 
@@ -193,12 +205,10 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			// all seems well :-)
 		}
 		catch (Exception e) {
-			e.printStackTrace();
 			printProcessLog("TaskManager", processOutput.toString());
-			fail(e.getMessage());
+			throw e;
 		}
 		catch (Error e) {
-			e.printStackTrace();
 			printProcessLog("TaskManager 1", processOutput.toString());
 			throw e;
 		}
@@ -209,6 +219,10 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			if (jmActorSystem != null) {
 				jmActorSystem.shutdown();
 			}
+
+			if (highAvailabilityServices != null) {
+				highAvailabilityServices.closeAndCleanupAllData();
+			}
 		}
 	}
 	
@@ -250,7 +264,11 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 		}
 		
 		// tell the JobManager to cancel the job
-		jobManager.tell(new JobManagerMessages.CancelJob(jobId), ActorRef.noSender());
+		jobManager.tell(
+			new JobManagerMessages.LeaderSessionMessage(
+				HighAvailabilityServices.DEFAULT_LEADER_ID,
+				new JobManagerMessages.CancelJob(jobId)),
+			ActorRef.noSender());
 	}
 
 	private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index fa7e071..bafdd9f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import scala.concurrent.Await;
@@ -87,8 +88,7 @@ public class TaskManagerFailureRecoveryITCase extends TestLogger {
 			// for the result
 			List<Long> resultCollection = new ArrayList<Long>();
 
-			final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
+			final ExecutionEnvironment env = new TestEnvironment(cluster, PARALLELISM, false);
 
 			env.setParallelism(PARALLELISM);
 			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 5eadba6..40a8f09 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
@@ -42,7 +41,6 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -80,6 +78,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 	public static void tearDown() throws Exception {
 		if (zkServer != null) {
 			zkServer.close();
+			zkServer = null;
 		}
 	}
 
@@ -112,8 +111,8 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 				cluster.waitForTaskManagersToBeRegisteredAtJobManager(leadingJM.actor());
 
 				Future<Object> registeredTMs = leadingJM.ask(
-						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						timeout);
+					JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+					timeout);
 
 				int numRegisteredTMs = (Integer) Await.result(registeredTMs, timeout);
 
@@ -122,8 +121,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 				cluster.clearLeader();
 				leadingJM.tell(PoisonPill.getInstance());
 			}
-		}
-		finally {
+		} finally {
 			cluster.stop();
 		}
 	}
@@ -273,15 +271,10 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 		@Override
 		public void run() {
 			try {
-				LeaderRetrievalService lrService =
-						LeaderRetrievalUtils.createLeaderRetrievalService(
-							cluster.configuration(),
-							false);
-
 				JobExecutionResult result = JobClient.submitJobAndWait(
 						clientActorSystem,
 						cluster.configuration(),
-						lrService,
+						cluster.highAvailabilityServices(),
 						graph,
 						timeout,
 						false,

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 2eecf49..82e8d94 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -27,14 +27,13 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -44,9 +43,11 @@ import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -71,36 +72,48 @@ import java.util.concurrent.TimeUnit;
  * Step 1: Migrate the job to 1.3 by submitting the same job used for the 1.2 savepoint, and create a new savepoint.
  * Step 2: Modify the job topology, and restore from the savepoint created in step 1.
  */
-public abstract class AbstractOperatorRestoreTestBase {
+public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 
 	@Rule
 	public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
 	private static ActorSystem actorSystem = null;
+	private static HighAvailabilityServices highAvailabilityServices = null;
 	private static ActorGateway jobManager = null;
 	private static ActorGateway archiver = null;
 	private static ActorGateway taskManager = null;
 
-	private static final FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+	private static final FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
 
 	@BeforeClass
 	public static void setupCluster() throws Exception {
-		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+		final Configuration configuration = new Configuration();
+
+		FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
 
 		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
 
+		highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+			configuration,
+			TestingUtils.defaultExecutor());
+
 		Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
-			new Configuration(),
+			configuration,
 			actorSystem,
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
+			highAvailabilityServices,
 			Option.apply("jm"),
 			Option.apply("arch"),
 			TestingJobManager.class,
 			TestingMemoryArchivist.class);
 
-		jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
-		archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+		jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
+			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+			actorSystem,
+			timeout);
+
+		archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
 
 		Configuration tmConfig = new Configuration();
 		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
@@ -109,13 +122,13 @@ public abstract class AbstractOperatorRestoreTestBase {
 			tmConfig,
 			ResourceID.generate(),
 			actorSystem,
+			highAvailabilityServices,
 			"localhost",
 			Option.apply("tm"),
-			Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path(), HighAvailabilityServices.DEFAULT_LEADER_ID)),
 			true,
 			TestingTaskManager.class);
 
-		taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+		taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
 
 		// Wait until connected
 		Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -123,7 +136,11 @@ public abstract class AbstractOperatorRestoreTestBase {
 	}
 
 	@AfterClass
-	public static void tearDownCluster() {
+	public static void tearDownCluster() throws Exception {
+		if (highAvailabilityServices != null) {
+			highAvailabilityServices.closeAndCleanupAllData();
+		}
+
 		if (actorSystem != null) {
 			actorSystem.shutdown();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 229d3fd..45686ef 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -45,6 +45,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
@@ -86,32 +87,24 @@ public class TimestampITCase extends TestLogger {
 
 	@BeforeClass
 	public static void startCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
 
-			cluster = new LocalFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 
-			cluster.start();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to start test cluster: " + e.getMessage());
-		}
+		cluster.start();
+
+		TestStreamEnvironment.setAsContext(cluster, PARALLELISM);
 	}
 
 	@AfterClass
 	public static void shutdownCluster() {
-		try {
-			cluster.shutdown();
-			cluster = null;
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to stop test cluster: " + e.getMessage());
-		}
+		cluster.shutdown();
+		cluster = null;
+
+		TestStreamEnvironment.unsetAsContext();
 	}
 
 	/**
@@ -132,8 +125,7 @@ public class TimestampITCase extends TestLogger {
 
 		long initialTime = 0L;
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(PARALLELISM);
@@ -182,8 +174,7 @@ public class TimestampITCase extends TestLogger {
 
 		long initialTime = 0L;
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(PARALLELISM);
@@ -270,8 +261,7 @@ public class TimestampITCase extends TestLogger {
 		final int NUM_ELEMENTS = 10;
 
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(PARALLELISM);
@@ -297,8 +287,7 @@ public class TimestampITCase extends TestLogger {
 	public void testDisabledTimestamps() throws Exception {
 		final int NUM_ELEMENTS = 10;
 		
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 		env.setParallelism(PARALLELISM);
@@ -325,7 +314,7 @@ public class TimestampITCase extends TestLogger {
 	public void testTimestampExtractorWithAutoInterval() throws Exception {
 		final int NUM_ELEMENTS = 10;
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.getConfig().setAutoWatermarkInterval(10);
@@ -389,7 +378,7 @@ public class TimestampITCase extends TestLogger {
 	public void testTimestampExtractorWithCustomWatermarkEmit() throws Exception {
 		final int NUM_ELEMENTS = 10;
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.getConfig().setAutoWatermarkInterval(10);
@@ -451,7 +440,7 @@ public class TimestampITCase extends TestLogger {
 	public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() throws Exception {
 		final int NUM_ELEMENTS = 10;
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.getConfig().setAutoWatermarkInterval(1);
@@ -513,7 +502,7 @@ public class TimestampITCase extends TestLogger {
 	public void testTimestampExtractorWithLongMaxWatermarkFromSource() throws Exception {
 		final int NUM_ELEMENTS = 10;
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.getConfig().setAutoWatermarkInterval(1);
@@ -574,7 +563,7 @@ public class TimestampITCase extends TestLogger {
 		final int NUM_ELEMENTS = 10;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+				.getExecutionEnvironment();
 
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.getConfig().setAutoWatermarkInterval(10);
@@ -630,7 +619,7 @@ public class TimestampITCase extends TestLogger {
 	@Test
 	public void testEventTimeSourceWithProcessingTime() throws Exception {
 		StreamExecutionEnvironment env = 
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+				StreamExecutionEnvironment.getExecutionEnvironment();
 		
 		env.setParallelism(2);
 		env.getConfig().disableSysoutLogging();
@@ -651,8 +640,7 @@ public class TimestampITCase extends TestLogger {
 	
 	@Test
 	public void testErrorOnEventTimeOverProcessingTime() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		env.setParallelism(2);
 		env.getConfig().disableSysoutLogging();
@@ -682,8 +670,7 @@ public class TimestampITCase extends TestLogger {
 
 	@Test
 	public void testErrorOnEventTimeWithoutTimestamps() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		env.setParallelism(2);
 		env.getConfig().disableSysoutLogging();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 5db02d1..e5f26c5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.scala.runtime.jobmanager
 
-import akka.actor.{ActorSystem, PoisonPill}
+import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.messages.Acknowledge
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated, NotifyWhenJobManagerTerminated}
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -66,11 +65,8 @@ class JobManagerFailsITCase(_system: ActorSystem)
           jmGateway.tell(RequestNumberRegisteredTaskManager, self)
           expectMsg(1)
 
-          tm ! NotifyWhenJobManagerTerminated(jmGateway.leaderSessionID())
-
-          jmGateway.tell(PoisonPill, self)
-
-          expectMsgClass(classOf[JobManagerTerminated])
+          // stop the current leader and make sure that he is gone
+          TestingUtils.stopActorGracefully(jmGateway)
 
           cluster.restartLeadingJobManager()
 
@@ -109,11 +105,8 @@ class JobManagerFailsITCase(_system: ActorSystem)
           jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), self)
           expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
-          tm.tell(NotifyWhenJobManagerTerminated(jmGateway.leaderSessionID()), self)
-
-          jmGateway.tell(PoisonPill, self)
-
-          expectMsgClass(classOf[JobManagerTerminated])
+          // stop the current leader and make sure that he is gone
+          TestingUtils.stopActorGracefully(jmGateway)
 
           cluster.restartLeadingJobManager()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 424fc8b..61cb8cc 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -226,7 +226,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
 
           tm ! NotifyWhenRegisteredAtJobManager
 
-          expectMsg(RegisteredAtJobManager)
+          expectMsgClass(classOf[RegisteredAtJobManager])
 
           jmGateway.tell(SubmitJob(jobGraph2, ListeningBehaviour.EXECUTION_RESULT), self)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index a503115..264b6aa 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.yarn;
 
-import akka.actor.ActorSystem;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
@@ -27,7 +26,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.RunOptions;
-import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -36,7 +34,6 @@ import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -129,7 +126,7 @@ public class FlinkYarnSessionCliTest {
 		Assert.assertEquals(2, descriptor.getTaskManagerCount());
 
 		Configuration config = new Configuration();
-		CliFrontend.setJobManagerAddressInConfig(config, new InetSocketAddress("test", 9000));
+		CliFrontend.setJobManagerAddressInConfig(config, new InetSocketAddress("localhost", 9000));
 		ClusterClient client = new TestingYarnClusterClient(descriptor, config);
 		Assert.assertEquals(6, client.getMaxSlots());
 	}
@@ -175,7 +172,7 @@ public class FlinkYarnSessionCliTest {
 
 	private static class TestingYarnClusterClient extends YarnClusterClient {
 
-		public TestingYarnClusterClient(AbstractYarnClusterDescriptor descriptor, Configuration config) throws IOException, YarnException {
+		public TestingYarnClusterClient(AbstractYarnClusterDescriptor descriptor, Configuration config) throws Exception {
 			super(descriptor,
 				Mockito.mock(YarnClient.class),
 				Mockito.mock(ApplicationReport.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index bc1af65..1f043ef 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -24,14 +24,15 @@ import akka.testkit.JavaTestKit;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
@@ -56,7 +57,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 
 	protected static ActorSystem actorSystem;
 
-	protected static final int numberApplicationAttempts = 10;
+	protected static final int numberApplicationAttempts = 3;
 
 	@Rule
 	public TemporaryFolder temp = new TemporaryFolder();
@@ -128,9 +129,19 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 
 		final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
 
+		HighAvailabilityServices highAvailabilityServices = null;
+
 		try {
 			yarnCluster = flinkYarnClient.deploy();
-			final Configuration config = yarnCluster.getFlinkConfiguration();
+
+			final ClusterClient finalYarnCluster = yarnCluster;
+
+			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+				yarnCluster.getFlinkConfiguration(),
+				Executors.directExecutor(),
+				HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
+
+			final HighAvailabilityServices finalHighAvailabilityServices = highAvailabilityServices;
 
 			new JavaTestKit(actorSystem) {{
 				for (int attempt = 0; attempt < numberKillingAttempts; attempt++) {
@@ -138,8 +149,10 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 						@Override
 						protected void run() {
 							try {
-								LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
-								ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
+								ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(
+									finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+									actorSystem,
+									timeout);
 								ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
 
 								gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
@@ -158,10 +171,13 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 					@Override
 					protected void run() {
 						try {
-							LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
-							ActorGateway gateway2 = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
-							ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway2.leaderSessionID());
-							gateway2.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
+							ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(
+								finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+								actorSystem,
+								timeout);
+							ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
+
+							gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
 
 							expectMsgEquals(Acknowledge.get());
 						} catch (Exception e) {
@@ -175,6 +191,10 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 			if (yarnCluster != null) {
 				yarnCluster.shutdown();
 			}
+
+			if (highAvailabilityServices != null) {
+				highAvailabilityServices.closeAndCleanupAllData();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 68cc73d..f45fe82 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -224,7 +224,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	 * Test the YARN Java API
 	 */
 	@Test
-	public void testJavaAPI() {
+	public void testJavaAPI() throws Exception {
 		final int WAIT_TIME = 15;
 		LOG.info("Starting testJavaAPI()");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index a5a6c36..65525f2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -1307,7 +1307,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			ApplicationReport report,
 			org.apache.flink.configuration.Configuration flinkConfiguration,
 			Path sessionFilesDir,
-			boolean perJobCluster) throws IOException, YarnException {
+			boolean perJobCluster) throws Exception {
 		return new YarnClusterClient(
 			descriptor,
 			yarnClient,

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index ee87cfe..b62f957 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -26,14 +26,16 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -42,7 +44,6 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -215,6 +216,7 @@ public class YarnApplicationMasterRunner {
 	protected int runApplicationMaster(Configuration config) {
 		ActorSystem actorSystem = null;
 		WebMonitor webMonitor = null;
+		HighAvailabilityServices highAvailabilityServices = null;
 
 		int numberProcessors = Hardware.getNumberCPUCores();
 
@@ -332,6 +334,16 @@ public class YarnApplicationMasterRunner {
 			// 3) Resource Master for YARN
 			// 4) Process reapers for the JobManager and Resource Master
 
+			// 0: Start the JobManager services
+
+			// update the configuration used to create the high availability services
+			config.setString(JobManagerOptions.ADDRESS, akkaHostname);
+			config.setInteger(JobManagerOptions.PORT, akkaPort);
+
+			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+				config,
+				ioExecutor,
+				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
 			// 1: the JobManager
 			LOG.debug("Starting JobManager actor");
@@ -342,6 +354,7 @@ public class YarnApplicationMasterRunner {
 				actorSystem,
 				futureExecutor,
 				ioExecutor,
+				highAvailabilityServices,
 				new Some<>(JobMaster.JOB_MANAGER_NAME),
 				Option.<String>empty(),
 				getJobManagerClass(),
@@ -351,7 +364,12 @@ public class YarnApplicationMasterRunner {
 			// 2: the web monitor
 			LOG.debug("Starting Web Frontend");
 
-			webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, LOG);
+			webMonitor = BootstrapTools.startWebMonitorIfConfigured(
+				config,
+				highAvailabilityServices,
+				actorSystem,
+				jobManager,
+				LOG);
 
 			String protocol = "http://";
 			if (config.getBoolean(ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED,
@@ -364,15 +382,11 @@ public class YarnApplicationMasterRunner {
 			// 3: Flink's Yarn ResourceManager
 			LOG.debug("Starting YARN Flink Resource Manager");
 
-			// we need the leader retrieval service here to be informed of new leaders and session IDs
-			LeaderRetrievalService leaderRetriever = 
-				LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
-
 			Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
 				getResourceManagerClass(),
 				config,
 				yarnConfig,
-				leaderRetriever,
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
 				appMasterHostname,
 				webMonitorURL,
 				taskManagerParameters,
@@ -437,6 +451,14 @@ public class YarnApplicationMasterRunner {
 			}
 		}
 
+		if (highAvailabilityServices != null) {
+			try {
+				highAvailabilityServices.close();
+			} catch (Throwable t) {
+				LOG.error("Failed to stop the high availability services.", t);
+			}
+		}
+
 		org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
 			AkkaUtils.getTimeout(config).toMillis(),
 			TimeUnit.MILLISECONDS,

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index f044cdd..e70af09 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -32,9 +32,8 @@ import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.conf.Configuration;
@@ -109,7 +108,7 @@ public class YarnClusterClient extends ClusterClient {
 		final ApplicationReport appReport,
 		org.apache.flink.configuration.Configuration flinkConfig,
 		Path sessionFilesDir,
-		boolean newlyCreatedCluster) throws IOException, YarnException {
+		boolean newlyCreatedCluster) throws Exception {
 
 		super(flinkConfig);
 
@@ -123,7 +122,10 @@ public class YarnClusterClient extends ClusterClient {
 		this.trackingURL = appReport.getTrackingUrl();
 		this.newlyCreatedCluster = newlyCreatedCluster;
 
-		this.applicationClient = new LazApplicationClientLoader(flinkConfig, actorSystemLoader);
+		this.applicationClient = new LazApplicationClientLoader(
+			flinkConfig,
+			actorSystemLoader,
+			highAvailabilityServices);
 
 		this.pollingRunner = new PollingThread(yarnClient, appId);
 		this.pollingRunner.setDaemon(true);
@@ -443,7 +445,12 @@ public class YarnClusterClient extends ClusterClient {
 		@Override
 		public void run() {
 			LOG.info("Shutting down YarnClusterClient from the client shutdown hook");
-			shutdown();
+
+			try {
+				shutdown();
+			} catch (Throwable t) {
+				LOG.warn("Could not properly shut down the yarn cluster client.", t);
+			}
 		}
 	}
 
@@ -545,14 +552,17 @@ public class YarnClusterClient extends ClusterClient {
 
 		private final org.apache.flink.configuration.Configuration flinkConfig;
 		private final LazyActorSystemLoader actorSystemLoader;
+		private final HighAvailabilityServices highAvailabilityServices;
 
 		private ActorRef applicationClient;
 
 		private LazApplicationClientLoader(
 				org.apache.flink.configuration.Configuration flinkConfig,
-				LazyActorSystemLoader actorSystemLoader) {
-			this.flinkConfig = flinkConfig;
-			this.actorSystemLoader = actorSystemLoader;
+				LazyActorSystemLoader actorSystemLoader,
+				HighAvailabilityServices highAvailabilityServices) {
+			this.flinkConfig = Preconditions.checkNotNull(flinkConfig, "flinkConfig");
+			this.actorSystemLoader = Preconditions.checkNotNull(actorSystemLoader, "actorSystemLoader");
+			this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices, "highAvailabilityServices");
 		}
 
 		/**
@@ -561,14 +571,6 @@ public class YarnClusterClient extends ClusterClient {
 		 */
 		public ActorRef get() {
 			if (applicationClient == null) {
-				/* The leader retrieval service for connecting to the cluster and finding the active leader. */
-				LeaderRetrievalService leaderRetrievalService;
-				try {
-					leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
-				} catch (Exception e) {
-					throw new RuntimeException("Could not create the leader retrieval service.", e);
-				}
-
 				// start application client
 				LOG.info("Start application client.");
 
@@ -576,7 +578,7 @@ public class YarnClusterClient extends ClusterClient {
 					Props.create(
 						ApplicationClient.class,
 						flinkConfig,
-						leaderRetrievalService),
+						highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID)),
 					"applicationClient");
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
index 5a3a3c0..33d5987 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.List;
@@ -58,11 +57,11 @@ public class YarnClusterClientV2 extends ClusterClient {
 	 *
 	 * @param clusterDescriptor The descriptor used to create yarn job
 	 * @param flinkConfig Flink configuration
-	 * @throws java.io.IOException
+	 * @throws Exception if the cluster client could not be created
 	 */
 	public YarnClusterClientV2(
 			final AbstractYarnClusterDescriptor clusterDescriptor,
-			org.apache.flink.configuration.Configuration flinkConfig) throws IOException {
+			org.apache.flink.configuration.Configuration flinkConfig) throws Exception {
 
 		super(flinkConfig);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 05b2be8..3f4d4f6 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -138,9 +138,10 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 				LOG.info("Starting High Availability Services");
 				commonRpcService = createRpcService(config, appMasterHostname, amPortRange);
 
-				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+				haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
 					config,
-					commonRpcService.getExecutor());
+					commonRpcService.getExecutor(),
+					HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
 				heartbeatServices = HeartbeatServices.fromConfiguration(config);
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
index ca5049c..557fa38 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
@@ -33,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URL;
 import java.net.URLDecoder;
@@ -224,22 +223,14 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 			String applicationName,
 			CommandLine cmdLine,
 			Configuration config,
-			List<URL> userJarFiles) {
+			List<URL> userJarFiles) throws Exception {
 		Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
 
 		YarnClusterDescriptorV2 yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
 		yarnClusterDescriptor.setFlinkConfiguration(config);
 		yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
 
-		YarnClusterClientV2 client = null;
-		try {
-			client = new YarnClusterClientV2(yarnClusterDescriptor, config);
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Fail to create YarnClusterClientV2", e.getCause());
-		}
-		return client;
-
+		return new YarnClusterClientV2(yarnClusterDescriptor, config);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index f4557c9..9277d21 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -678,7 +678,12 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	public void stop() {
 		if (yarnCluster != null) {
 			LOG.info("Command line interface is shutting down the yarnCluster");
-			yarnCluster.shutdown();
+
+			try {
+				yarnCluster.shutdown();
+			} catch (Throwable t) {
+				LOG.warn("Could not properly shutdown the yarn cluster.", t);
+			}
 		}
 	}
 


Mime
View raw message