flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7457] Make Dispatcher highly available
Date Sun, 03 Sep 2017 21:01:37 GMT
Repository: flink
Updated Branches:
  refs/heads/master a00830318 -> fb3bd1fce


[FLINK-7457] Make Dispatcher highly available

This commit introduces a dispatcher leader election and retrieval service to the
HighAvailabilityServices. Moreover it adds code such that the Dispatcher now takes
part in the leader election process using the afore-mentioned services.

Let Dispatcher participate in leader election

Add test for Dispatcher leader election

This closes #4548.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb3bd1fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb3bd1fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb3bd1fc

Branch: refs/heads/master
Commit: fb3bd1fceba6da362966491e55c3bf27566ede53
Parents: a008303
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Aug 16 14:36:13 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Sun Sep 3 22:59:25 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 152 ++++++++++++++++++-
 .../runtime/dispatcher/DispatcherException.java |  40 +++++
 .../runtime/dispatcher/DispatcherGateway.java   |   3 +
 .../HighAvailabilityServices.java               |  13 ++
 .../HighAvailabilityServicesUtils.java          |  12 +-
 .../nonha/embedded/EmbeddedHaServices.java      |  13 ++
 .../nonha/standalone/StandaloneHaServices.java  |  27 +++-
 .../zookeeper/ZooKeeperHaServices.java          |  12 ++
 .../org/apache/flink/runtime/rpc/RpcUtils.java  |  22 +++
 .../runtime/dispatcher/DispatcherTest.java      | 114 +++++++++++---
 .../TestingHighAvailabilityServices.java        |  33 ++++
 .../TestingManualHighAvailabilityServices.java  |  13 ++
 .../standalone/StandaloneHaServicesTest.java    |   6 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |   2 +
 .../YarnIntraNonHaMasterServices.java           |  25 +++
 .../YarnPreConfiguredMasterNonHaServices.java   |  32 ++++
 16 files changed, 492 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index bb0b3e4..29262cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -28,17 +28,21 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -47,6 +51,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -55,7 +60,7 @@ import java.util.concurrent.CompletableFuture;
  * the jobs and to recover them in case of a master failure. Furthermore, it knows
  * about the state of the Flink session cluster.
  */
-public abstract class Dispatcher extends RpcEndpoint implements DispatcherGateway {
+public abstract class Dispatcher extends RpcEndpoint implements DispatcherGateway, LeaderContender
{
 
 	public static final String DISPATCHER_NAME = "dispatcher";
 
@@ -73,6 +78,10 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 
 	private final Map<JobID, JobManagerRunner> jobManagerRunners;
 
+	private final LeaderElectionService leaderElectionService;
+
+	private volatile UUID leaderSessionId;
+
 	protected Dispatcher(
 			RpcService rpcService,
 			String endpointId,
@@ -95,6 +104,11 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
 
 		jobManagerRunners = new HashMap<>(16);
+
+		leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
+
+		// we are not the leader when this object is created
+		leaderSessionId = null;
 	}
 
 	//------------------------------------------------------
@@ -104,12 +118,8 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 	@Override
 	public void postStop() throws Exception {
 		Exception exception = null;
-		// stop all currently running JobManagerRunners
-		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
-			jobManagerRunner.shutdown();
-		}
 
-		jobManagerRunners.clear();
+		clearState();
 
 		try {
 			submittedJobGraphStore.stop();
@@ -118,6 +128,12 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 		}
 
 		try {
+			leaderElectionService.stop();
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		try {
 			super.postStop();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
@@ -128,12 +144,26 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 		}
 	}
 
+	@Override
+	public void start() throws Exception {
+		super.start();
+
+		leaderElectionService.start(this);
+	}
+
 	//------------------------------------------------------
 	// RPCs
 	//------------------------------------------------------
 
 	@Override
-	public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
+	public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, UUID leaderSessionId,
Time timeout) {
+
+		try {
+			validateLeaderSessionId(leaderSessionId);
+		} catch (LeaderIdMismatchException e) {
+			return FutureUtils.completedExceptionally(e);
+		}
+
 		final JobID jobId = jobGraph.getJobID();
 
 		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
@@ -224,6 +254,62 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 		// TODO: remove job related files from blob server
 	}
 
+	/**
+	 * Clears the state of the dispatcher.
+	 *
+	 * <p>The state are all currently running jobs.
+	 */
+	private void clearState() {
+		// stop all currently running JobManager since they run in the same process
+		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
+			jobManagerRunner.shutdown();
+		}
+
+		jobManagerRunners.clear();
+	}
+
+	/**
+	 * Recovers all jobs persisted via the submitted job graph store.
+	 */
+	private void recoverJobs() {
+		log.info("Recovering all persisted jobs.");
+
+		final UUID currentLeaderSessionId = leaderSessionId;
+
+		getRpcService().execute(
+			() -> {
+				final Collection<JobID> jobIds;
+
+				try {
+					jobIds = submittedJobGraphStore.getJobIds();
+				} catch (Exception e) {
+					log.error("Could not recover job ids from the submitted job graph store. Aborting recovery.",
e);
+					return;
+				}
+
+				for (JobID jobId : jobIds) {
+					try {
+						SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
+
+						runAsync(() -> submitJob(submittedJobGraph.getJobGraph(), currentLeaderSessionId,
RpcUtils.INF_TIMEOUT));
+					} catch (Exception e) {
+						log.error("Could not recover the job graph for " + jobId + '.', e);
+					}
+				}
+			});
+	}
+
+	private void onFatalError(Throwable throwable) {
+		log.error("Fatal error occurred in dispatcher {}.", getAddress(), throwable);
+		fatalErrorHandler.onFatalError(throwable);
+	}
+
+	private void validateLeaderSessionId(UUID leaderSessionID) throws LeaderIdMismatchException
{
+		if (this.leaderSessionId == null || !this.leaderSessionId.equals(leaderSessionID)) {
+			throw new LeaderIdMismatchException(this.leaderSessionId, leaderSessionID);
+		}
+	}
+
 	protected abstract JobManagerRunner createJobManagerRunner(
 		ResourceID resourceId,
 		JobGraph jobGraph,
@@ -237,6 +323,58 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 		FatalErrorHandler fatalErrorHandler) throws Exception;
 
 	//------------------------------------------------------
+	// Leader contender
+	//------------------------------------------------------
+
+	/**
+	 * Callback method when current resourceManager is granted leadership.
+	 *
+	 * @param newLeaderSessionID unique leadershipID
+	 */
+	@Override
+	public void grantLeadership(final UUID newLeaderSessionID) {
+		runAsync(
+			() -> {
+				log.info("Dispatcher {} was granted leadership with leader session ID {}", getAddress(),
newLeaderSessionID);
+
+				// clear the state if we've been the leader before
+				if (leaderSessionId != null) {
+					clearState();
+				}
+
+				leaderSessionId = newLeaderSessionID;
+
+				// confirming the leader session ID might be blocking,
+				getRpcService().execute(
+					() -> leaderElectionService.confirmLeaderSessionID(newLeaderSessionID));
+
+				recoverJobs();
+			});
+	}
+
+	/**
+	 * Callback method when current resourceManager loses leadership.
+	 */
+	@Override
+	public void revokeLeadership() {
+		runAsync(
+			() -> {
+				log.info("Dispatcher {} was revoked leadership.", getAddress());
+				clearState();
+			});
+	}
+
+	/**
+	 * Handles error occurring in the leader election service.
+	 *
+	 * @param exception Exception being thrown in the leader election service
+	 */
+	@Override
+	public void handleError(final Exception exception) {
+		onFatalError(new DispatcherException("Received an error from the LeaderElectionService.",
exception));
+	}
+
+	//------------------------------------------------------
 	// Utility classes
 	//------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherException.java
new file mode 100644
index 0000000..cf4a493
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Base class for {@link Dispatcher} related exceptions.
+ */
+public class DispatcherException extends FlinkException {
+	private static final long serialVersionUID = 3781733042984381286L;
+
+	public DispatcherException(String message) {
+		super(message);
+	}
+
+	public DispatcherException(Throwable cause) {
+		super(cause);
+	}
+
+	public DispatcherException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 33b8a42..669f616 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -37,11 +38,13 @@ public interface DispatcherGateway extends RpcGateway {
 	 * Submit a job to the dispatcher.
 	 *
 	 * @param jobGraph JobGraph to submit
+	 * @param leaderSessionId leader session id
 	 * @param timeout RPC timeout
 	 * @return A future acknowledge if the submission succeeded
 	 */
 	CompletableFuture<Acknowledge> submitJob(
 		JobGraph jobGraph,
+		UUID leaderSessionId,
 		@RpcTimeout Time timeout);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index b44905e..defe5cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -73,6 +73,12 @@ public interface HighAvailabilityServices extends AutoCloseable {
 	LeaderRetrievalService getResourceManagerLeaderRetriever();
 
 	/**
+	 * Gets the leader retriever for the dispatcher. This leader retrieval service
+	 * is not always accessible.
+	 */
+	LeaderRetrievalService getDispatcherLeaderRetriever();
+
+	/**
 	 * Gets the leader retriever for the job JobMaster which is responsible for the given job
 	 *
 	 * @param jobID The identifier of the job.
@@ -100,6 +106,13 @@ public interface HighAvailabilityServices extends AutoCloseable {
 	LeaderElectionService getResourceManagerLeaderElectionService();
 
 	/**
+	 * Gets the leader election service for the cluster's dispatcher.
+	 *
+	 * @return Leader election service for the dispatcher leader election
+	 */
+	LeaderElectionService getDispatcherLeaderElectionService();
+
+	/**
 	 * Gets the leader election service for the given job.
 	 *
 	 * @param jobID The identifier of the job running the election.

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 2ebfd20..7a89ed8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
 import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
@@ -87,8 +88,17 @@ public class HighAvailabilityServicesUtils {
 					ResourceManager.RESOURCE_MANAGER_NAME,
 					addressResolution,
 					configuration);
+				final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
+					hostnamePort.f0,
+					hostnamePort.f1,
+					Dispatcher.DISPATCHER_NAME,
+					addressResolution,
+					configuration);
 
-				return new StandaloneHaServices(resourceManagerRpcUrl, jobManagerRpcUrl);
+				return new StandaloneHaServices(
+					resourceManagerRpcUrl,
+					dispatcherRpcUrl,
+					jobManagerRpcUrl);
 			case ZOOKEEPER:
 				BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
index 76eb681..4c30f87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
@@ -45,11 +45,14 @@ public class EmbeddedHaServices extends AbstractNonHaServices {
 
 	private final EmbeddedLeaderService resourceManagerLeaderService;
 
+	private final EmbeddedLeaderService dispatcherLeaderService;
+
 	private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
 
 	public EmbeddedHaServices(Executor executor) {
 		this.executor = Preconditions.checkNotNull(executor);
 		this.resourceManagerLeaderService = new EmbeddedLeaderService(executor);
+		this.dispatcherLeaderService = new EmbeddedLeaderService(executor);
 		this.jobManagerLeaderServices = new HashMap<>();
 	}
 
@@ -63,11 +66,21 @@ public class EmbeddedHaServices extends AbstractNonHaServices {
 	}
 
 	@Override
+	public LeaderRetrievalService getDispatcherLeaderRetriever() {
+		return dispatcherLeaderService.createLeaderRetrievalService();
+	}
+
+	@Override
 	public LeaderElectionService getResourceManagerLeaderElectionService() {
 		return resourceManagerLeaderService.createLeaderElectionService();
 	}
 
 	@Override
+	public LeaderElectionService getDispatcherLeaderElectionService() {
+		return dispatcherLeaderService.createLeaderElectionService();
+	}
+
+	@Override
 	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
 		checkNotNull(jobID);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
index b3c6ee5..617b351 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
@@ -45,6 +45,9 @@ public class StandaloneHaServices extends AbstractNonHaServices {
 	/** The fix address of the ResourceManager */
 	private final String resourceManagerAddress;
 
+	/** The fix address of the Dispatcher */
+	private final String dispatcherAddress;
+
 	/** The fix address of the JobManager */
 	private final String jobManagerAddress;
 
@@ -53,8 +56,12 @@ public class StandaloneHaServices extends AbstractNonHaServices {
 	 * 
 	 * @param resourceManagerAddress    The fix address of the ResourceManager
 	 */
-	public StandaloneHaServices(String resourceManagerAddress, String jobManagerAddress) {
+	public StandaloneHaServices(
+		String resourceManagerAddress,
+		String dispatcherAddress,
+		String jobManagerAddress) {
 		this.resourceManagerAddress = checkNotNull(resourceManagerAddress, "resourceManagerAddress");
+		this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress");
 		this.jobManagerAddress = checkNotNull(jobManagerAddress, "jobManagerAddress");
 	}
 
@@ -73,6 +80,15 @@ public class StandaloneHaServices extends AbstractNonHaServices {
 	}
 
 	@Override
+	public LeaderRetrievalService getDispatcherLeaderRetriever() {
+		synchronized (lock) {
+			checkNotShutdown();
+
+			return new StandaloneLeaderRetrievalService(dispatcherAddress, DEFAULT_LEADER_ID);
+		}
+	}
+
+	@Override
 	public LeaderElectionService getResourceManagerLeaderElectionService() {
 		synchronized (lock) {
 			checkNotShutdown();
@@ -82,6 +98,15 @@ public class StandaloneHaServices extends AbstractNonHaServices {
 	}
 
 	@Override
+	public LeaderElectionService getDispatcherLeaderElectionService() {
+		synchronized (lock) {
+			checkNotShutdown();
+
+			return new StandaloneLeaderElectionService();
+		}
+	}
+
+	@Override
 	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
 		synchronized (lock) {
 			checkNotShutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 9dabfa2..04ab6d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -82,6 +82,8 @@ public class ZooKeeperHaServices implements HighAvailabilityServices {
 
 	private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
 
+	private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";
+
 	private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
 
 	// ------------------------------------------------------------------------
@@ -125,6 +127,11 @@ public class ZooKeeperHaServices implements HighAvailabilityServices
{
 	}
 
 	@Override
+	public LeaderRetrievalService getDispatcherLeaderRetriever() {
+		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH);
+	}
+
+	@Override
 	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
 		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
 	}
@@ -140,6 +147,11 @@ public class ZooKeeperHaServices implements HighAvailabilityServices
{
 	}
 
 	@Override
+	public LeaderElectionService getDispatcherLeaderElectionService() {
+		return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH);
+	}
+
+	@Override
 	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
 		return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
index 9738970..a644efd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
@@ -18,13 +18,21 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.api.common.time.Time;
+
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Utility functions for Flink's RPC implementation
  */
 public class RpcUtils {
+
+	public static final Time INF_TIMEOUT = Time.milliseconds(Long.MAX_VALUE);
+
 	/**
 	 * Extracts all {@link RpcGateway} interfaces implemented by the given clazz.
 	 *
@@ -47,6 +55,20 @@ public class RpcUtils {
 		return interfaces;
 	}
 
+	/**
+	 * Shuts the given {@link RpcEndpoint} down and awaits its termination.
+	 *
+	 * @param rpcEndpoint to terminate
+	 * @param timeout for this operation
+	 * @throws ExecutionException if a problem occurs
+	 * @throws InterruptedException if the operation has been interrupted
+	 * @throws TimeoutException if a timeout occurred
+	 */
+	public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout) throws ExecutionException,
InterruptedException, TimeoutException {
+		rpcEndpoint.shutDown();
+		rpcEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+	}
+
 	// We don't want this class to be instantiable
 	private RpcUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 4237327..091608c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -25,24 +25,33 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -52,6 +61,23 @@ import static org.mockito.Mockito.when;
  */
 public class DispatcherTest extends TestLogger {
 
+	private static RpcService rpcService;
+	private static final Time timeout = Time.seconds(10L);
+
+	@BeforeClass
+	public static void setup() {
+		rpcService = new TestingRpcService();
+	}
+
+	@AfterClass
+	public static void teardown() {
+		if (rpcService != null) {
+			rpcService.stopService();
+
+			rpcService = null;
+		}
+	}
+
 	/**
 	 * Tests that we can submit a job to the Dispatcher which then spawns a
 	 * new JobManagerRunner.
@@ -59,34 +85,35 @@ public class DispatcherTest extends TestLogger {
 	@Test
 	public void testJobSubmission() throws Exception {
 		TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
-		RpcService rpcService = new TestingRpcService();
-		HighAvailabilityServices haServices = new StandaloneHaServices("localhost", "localhost");
+		HighAvailabilityServices haServices = new StandaloneHaServices(
+			"localhost",
+			"localhost",
+			"localhost");
 		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
 		JobManagerRunner jobManagerRunner = mock(JobManagerRunner.class);
 
-		final Time timeout = Time.seconds(5L);
 		final JobGraph jobGraph = mock(JobGraph.class);
 		final JobID jobId = new JobID();
 		when(jobGraph.getJobID()).thenReturn(jobId);
 
-		try {
-			final TestingDispatcher dispatcher = new TestingDispatcher(
-				rpcService,
-				Dispatcher.DISPATCHER_NAME,
-				new Configuration(),
-				haServices,
-				mock(BlobServer.class),
-				heartbeatServices,
-				mock(MetricRegistry.class),
-				fatalErrorHandler,
-				jobManagerRunner,
-				jobId);
+		final TestingDispatcher dispatcher = new TestingDispatcher(
+			rpcService,
+			Dispatcher.DISPATCHER_NAME,
+			new Configuration(),
+			haServices,
+			mock(BlobServer.class),
+			heartbeatServices,
+			mock(MetricRegistry.class),
+			fatalErrorHandler,
+			jobManagerRunner,
+			jobId);
 
+		try {
 			dispatcher.start();
 
 			DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-			CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph,
timeout);
+			CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph,
HighAvailabilityServices.DEFAULT_LEADER_ID, timeout);
 
 			acknowledgeFuture.get();
 
@@ -95,7 +122,60 @@ public class DispatcherTest extends TestLogger {
 			// check that no error has occurred
 			fatalErrorHandler.rethrowError();
 		} finally {
-			rpcService.stopService();
+			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+		}
+	}
+
+	/**
+	 * Tests that the dispatcher takes part in the leader election.
+	 */
+	@Test
+	public void testLeaderElection() throws Exception {
+		TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+		TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+
+		UUID expectedLeaderSessionId = UUID.randomUUID();
+		CompletableFuture<UUID> leaderSessionIdFuture = new CompletableFuture<>();
+		SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class);
+		TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService()
{
+			@Override
+			public void confirmLeaderSessionID(UUID leaderSessionId) {
+				super.confirmLeaderSessionID(leaderSessionId);
+				leaderSessionIdFuture.complete(leaderSessionId);
+			}
+		};
+
+		haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore);
+		haServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
+		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
+		final JobID jobId = new JobID();
+
+		final TestingDispatcher dispatcher = new TestingDispatcher(
+			rpcService,
+			Dispatcher.DISPATCHER_NAME,
+			new Configuration(),
+			haServices,
+			mock(BlobServer.class),
+			heartbeatServices,
+			mock(MetricRegistry.class),
+			fatalErrorHandler,
+			mock(JobManagerRunner.class),
+			jobId);
+
+		try {
+			dispatcher.start();
+
+			assertFalse(leaderSessionIdFuture.isDone());
+
+			testingLeaderElectionService.isLeader(expectedLeaderSessionId);
+
+			UUID actualLeaderSessionId = leaderSessionIdFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
+
+			verify(mockSubmittedJobGraphStore, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds();
+		} finally {
+			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 0a7e9c8..dba7bef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -38,12 +38,16 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
 
+	private volatile LeaderRetrievalService dispatcherLeaderRetriever;
+
 	private ConcurrentHashMap<JobID, LeaderRetrievalService> jobMasterLeaderRetrievers
= new ConcurrentHashMap<>();
 
 	private ConcurrentHashMap<JobID, LeaderElectionService> jobManagerLeaderElectionServices
= new ConcurrentHashMap<>();
 
 	private volatile LeaderElectionService resourceManagerLeaderElectionService;
 
+	private volatile LeaderElectionService dispatcherLeaderElectionService;
+
 	private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
 
 	private volatile SubmittedJobGraphStore submittedJobGraphStore;
@@ -56,6 +60,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 		this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
 	}
 
+	public void setDispatcherLeaderRetriever(LeaderRetrievalService dispatcherLeaderRetriever)
{
+		this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+	}
+
 	public void setJobMasterLeaderRetriever(JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever)
{
 		this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
 	}
@@ -68,6 +76,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 		this.resourceManagerLeaderElectionService = leaderElectionService;
 	}
 
+	public void setDispatcherLeaderElectionService(LeaderElectionService leaderElectionService)
{
+		this.dispatcherLeaderElectionService = leaderElectionService;
+	}
+
 	public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory)
{
 		this.checkpointRecoveryFactory = checkpointRecoveryFactory;
 	}
@@ -91,6 +103,16 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
+	public LeaderRetrievalService getDispatcherLeaderRetriever() {
+		LeaderRetrievalService service = this.dispatcherLeaderRetriever;
+		if (service != null) {
+			return service;
+		} else {
+			throw new IllegalStateException("ResourceManagerLeaderRetriever has not been set");
+		}
+	}
+
+	@Override
 	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
 		LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
 		if (service != null) {
@@ -117,6 +139,17 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
+	public LeaderElectionService getDispatcherLeaderElectionService() {
+		LeaderElectionService service = dispatcherLeaderElectionService;
+
+		if (service != null) {
+			return service;
+		} else {
+			throw new IllegalStateException("DispatcherLeaderElectionService has not been set");
+		}
+	}
+
+	@Override
 	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
 		LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
index 0735d17..1f319eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
@@ -45,9 +45,12 @@ public class TestingManualHighAvailabilityServices implements HighAvailabilitySe
 
 	private final ManualLeaderService resourceManagerLeaderService;
 
+	private final ManualLeaderService dispatcherLeaderService;
+
 	public TestingManualHighAvailabilityServices() {
 		jobManagerLeaderServices = new HashMap<>(4);
 		resourceManagerLeaderService = new ManualLeaderService();
+		dispatcherLeaderService = new ManualLeaderService();
 	}
 
 	@Override
@@ -56,6 +59,11 @@ public class TestingManualHighAvailabilityServices implements HighAvailabilitySe
 	}
 
 	@Override
+	public LeaderRetrievalService getDispatcherLeaderRetriever() {
+		return dispatcherLeaderService.createLeaderRetrievalService();
+	}
+
+	@Override
 	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
 		ManualLeaderService leaderService = getOrCreateJobManagerLeaderService(jobID);
 
@@ -73,6 +81,11 @@ public class TestingManualHighAvailabilityServices implements HighAvailabilitySe
 	}
 
 	@Override
+	public LeaderElectionService getDispatcherLeaderElectionService() {
+		return dispatcherLeaderService.createLeaderElectionService();
+	}
+
+	@Override
 	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
 		ManualLeaderService leaderService = getOrCreateJobManagerLeaderService(jobID);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
index 2d51360..1cf2e5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
@@ -39,6 +39,7 @@ import static org.mockito.Mockito.verify;
 public class StandaloneHaServicesTest extends TestLogger {
 
 	private final String jobManagerAddress = "jobManager";
+	private final String dispatcherAddress = "dispatcher";
 	private final String resourceManagerAddress = "resourceManager";
 
 	private StandaloneHaServices standaloneHaServices;
@@ -46,7 +47,10 @@ public class StandaloneHaServicesTest extends TestLogger {
 	@Before
 	public void setupTest() {
 
-		standaloneHaServices = new StandaloneHaServices(resourceManagerAddress, jobManagerAddress);
+		standaloneHaServices = new StandaloneHaServices(
+			resourceManagerAddress,
+			dispatcherAddress,
+			jobManagerAddress);
 	}
 
 	@After

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 6842bee..e622130 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -462,6 +462,7 @@ public class TaskExecutorTest extends TestLogger {
 		final ResourceID resourceID = ResourceID.generate();
 		final String resourceManagerAddress = "/resource/manager/address/one";
 		final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
+		final String dispatcherAddress = "localhost";
 		final String jobManagerAddress = "localhost";
 
 		final TestingRpcService rpc = new TestingRpcService();
@@ -483,6 +484,7 @@ public class TaskExecutorTest extends TestLogger {
 
 			StandaloneHaServices haServices = new StandaloneHaServices(
 				resourceManagerAddress,
+				dispatcherAddress,
 				jobManagerAddress);
 
 			final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
index 75f8c0a..86db1c4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
@@ -71,6 +71,9 @@ public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices
{
 	/** The embedded leader election service used by JobManagers to find the resource manager.
*/
 	private final SingleLeaderElectionService resourceManagerLeaderElectionService;
 
+	/** The embedded leader election service for the dispatcher. */
+	private final SingleLeaderElectionService dispatcherLeaderElectionService;
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -100,6 +103,7 @@ public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices
{
 		try {
 			this.dispatcher = Executors.newSingleThreadExecutor(new ServicesThreadFactory());
 			this.resourceManagerLeaderElectionService = new SingleLeaderElectionService(dispatcher,
DEFAULT_LEADER_ID);
+			this.dispatcherLeaderElectionService = new SingleLeaderElectionService(dispatcher, DEFAULT_LEADER_ID);
 
 			// all good!
 			successful = true;
@@ -130,6 +134,17 @@ public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices
{
 	}
 
 	@Override
+	public LeaderRetrievalService getDispatcherLeaderRetriever() {
+		enter();
+
+		try {
+			return dispatcherLeaderElectionService.createLeaderRetrievalService();
+		} finally {
+			exit();
+		}
+	}
+
+	@Override
 	public LeaderElectionService getResourceManagerLeaderElectionService() {
 		enter();
 		try {
@@ -141,6 +156,16 @@ public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices
{
 	}
 
 	@Override
+	public LeaderElectionService getDispatcherLeaderElectionService() {
+		enter();
+		try {
+			return dispatcherLeaderElectionService;
+		} finally {
+			exit();
+		}
+	}
+
+	@Override
 	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
 		enter();
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/fb3bd1fc/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
index 6686a52..c1466d2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn.highavailability;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -66,6 +67,9 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi
 	/** The RPC URL under which the single ResourceManager can be reached while available. */
 	private final String resourceManagerRpcUrl;
 
+	/** The RPC URL under which the single Dispatcher can be reached while available. */
+	private final String dispatcherRpcUrl;
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -116,6 +120,13 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi
 				addressResolution,
 				config);
 
+			this.dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
+				rmHost,
+				rmPort,
+				Dispatcher.DISPATCHER_NAME,
+				addressResolution,
+				config);
+
 			// all well!
 			successful = true;
 		}
@@ -145,6 +156,17 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi
 	}
 
 	@Override
+	public LeaderRetrievalService getDispatcherLeaderRetriever() {
+		enter();
+
+		try {
+			return new StandaloneLeaderRetrievalService(dispatcherRpcUrl, DEFAULT_LEADER_ID);
+		} finally {
+			exit();
+		}
+	}
+
+	@Override
 	public LeaderElectionService getResourceManagerLeaderElectionService() {
 		enter();
 		try {
@@ -156,6 +178,16 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi
 	}
 
 	@Override
+	public LeaderElectionService getDispatcherLeaderElectionService() {
+		enter();
+		try {
+			throw new UnsupportedOperationException("Not supported on the TaskManager side");
+		} finally {
+			exit();
+		}
+	}
+
+	@Override
 	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
 		enter();
 		try {


Mime
View raw message