flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-6120] [heartbeat] Implement heartbeat logic between JobManager and ResourceManager
Date Thu, 27 Apr 2017 16:04:35 GMT
Repository: flink
Updated Branches:
  refs/heads/master bb972b85a -> 81114d5f7


[FLINK-6120] [heartbeat] Implement heartbeat logic between JobManager and ResourceManager

This closes #3645.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81114d5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81114d5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81114d5f

Branch: refs/heads/master
Commit: 81114d5f71ee05908a93ebd21475dcec1f5eed09
Parents: bb972b8
Author: Zhijiang <wangzhijiang999@aliyun.com>
Authored: Thu Mar 30 00:30:29 2017 +0800
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Apr 27 17:56:51 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      | 153 +++++++++++++++----
 .../runtime/jobmaster/JobMasterGateway.java     |   7 +
 .../jobmaster/JobMasterRegistrationSuccess.java |  14 +-
 .../resourcemanager/ResourceManager.java        | 133 +++++++++++++---
 .../resourcemanager/ResourceManagerGateway.java |  18 ++-
 .../registration/JobManagerRegistration.java    |   9 ++
 .../clusterframework/ResourceManagerTest.java   | 116 +++++++++++++-
 .../flink/runtime/jobmaster/JobMasterTest.java  |  89 ++++++++++-
 .../ResourceManagerJobMasterTest.java           |  42 ++++-
 .../slotmanager/SlotProtocolTest.java           |   6 +-
 .../taskexecutor/TaskExecutorITCase.java        |   8 +-
 11 files changed, 521 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6fe8cb3..ab43577 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -155,7 +155,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	private final MetricGroup jobMetricGroup;
 
 	/** The heartbeat manager with task managers */
-	private final HeartbeatManager<Void, Void> heartbeatManager;
+	private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
+
+	/** The heartbeat manager with resource manager */
+	private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
 
 	/** The execution context which is used to execute futures */
 	private final Executor executor;
@@ -218,12 +221,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		this.errorHandler = checkNotNull(errorHandler);
 		this.userCodeLoader = checkNotNull(userCodeLoader);
 
-		this.heartbeatManager = heartbeatServices.createHeartbeatManagerSender(
+		this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
 			resourceId,
 			new TaskManagerHeartbeatListener(),
 			rpcService.getScheduledExecutor(),
 			log);
 
+		this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
+				resourceId,
+				new ResourceManagerHeartbeatListener(),
+				rpcService.getScheduledExecutor(),
+				log);
+
 		final String jobName = jobGraph.getName();
 		final JobID jid = jobGraph.getJobID();
 
@@ -309,7 +318,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 */
 	@Override
 	public void shutDown() throws Exception {
-		heartbeatManager.stop();
+		taskManagerHeartbeatManager.stop();
+		resourceManagerHeartbeatManager.stop();
 
 		// make sure there is a graceful exit
 		getSelf().suspendExecution(new Exception("JobManager is shutting down."));
@@ -407,7 +417,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		slotPoolGateway.suspend();
 
 		// disconnect from resource manager:
-		closeResourceManagerConnection();
+		closeResourceManagerConnection(new Exception("Execution was suspended.", cause));
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -534,7 +544,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	@RpcMethod
 	public void disconnectTaskManager(final ResourceID resourceID, final Exception cause) {
-		heartbeatManager.unmonitorTarget(resourceID);
+		taskManagerHeartbeatManager.unmonitorTarget(resourceID);
 		slotPoolGateway.releaseTaskManager(resourceID);
 
 		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = registeredTaskManagers.remove(resourceID);
@@ -766,7 +776,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 					registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
 
 					// monitor the task manager as heartbeat target
-					heartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
+					taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
 						@Override
 						public void receiveHeartbeat(ResourceID resourceID, Void payload) {
 							// the task manager will not request heartbeat, so this method will never be called currently
@@ -788,13 +798,24 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	public void disconnectResourceManager(
 			final UUID jobManagerLeaderId,
 			final UUID resourceManagerLeaderId,
-			final Exception cause) {
-		// TODO: Implement disconnect behaviour
+			final Exception cause) throws Exception {
+
+		validateLeaderSessionId(jobManagerLeaderId);
+
+		if (resourceManagerConnection != null
+				&& resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) {
+			closeResourceManagerConnection(cause);
+		}
 	}
 
 	@RpcMethod
 	public void heartbeatFromTaskManager(final ResourceID resourceID) {
-		heartbeatManager.receiveHeartbeat(resourceID, null);
+		taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
+	}
+
+	@RpcMethod
+	public void heartbeatFromResourceManager(final ResourceID resourceID) {
+		resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -872,56 +893,79 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		}
 	}
 
-	private void notifyOfNewResourceManagerLeader(
-			final String resourceManagerAddress, final UUID resourceManagerLeaderId)
-	{
-		validateRunsInMainThread();
-
+	private void notifyOfNewResourceManagerLeader(final String resourceManagerAddress, final UUID resourceManagerLeaderId) {
 		if (resourceManagerConnection != null) {
 			if (resourceManagerAddress != null) {
 				if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
-						&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
+					&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
 					// both address and leader id are not changed, we can keep the old connection
 					return;
 				}
+
+				closeResourceManagerConnection(new Exception(
+					"ResourceManager leader changed to new address " + resourceManagerAddress));
+
 				log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
-						resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+					resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
 			} else {
 				log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
-						resourceManagerConnection.getTargetAddress());
+					resourceManagerConnection.getTargetAddress());
 			}
 		}
 
-		closeResourceManagerConnection();
-
 		if (resourceManagerAddress != null) {
 			log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
+
 			resourceManagerConnection = new ResourceManagerConnection(
-					log, jobGraph.getJobID(), getAddress(), leaderSessionID,
-					resourceManagerAddress, resourceManagerLeaderId, executor);
+				log,
+				jobGraph.getJobID(),
+				resourceId,
+				getAddress(),
+				leaderSessionID,
+				resourceManagerAddress,
+				resourceManagerLeaderId,
+				executor);
+
 			resourceManagerConnection.start();
 		}
 	}
 
-	private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) {
-		validateRunsInMainThread();
+	private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
+		final UUID resourceManagerLeaderId = success.getResourceManagerLeaderId();
 	
 		// verify the response with current connection
 		if (resourceManagerConnection != null
-				&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
-		{
-			log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
-					success.getResourceManagerLeaderId());
+				&& resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) {
 
-			slotPoolGateway.connectToResourceManager(
-					success.getResourceManagerLeaderId(), resourceManagerConnection.getTargetGateway());
+			log.info("JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerLeaderId);
+
+			final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+
+			slotPoolGateway.connectToResourceManager(resourceManagerLeaderId, resourceManagerGateway);
+
+			resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(), new HeartbeatTarget<Void>() {
+				@Override
+				public void receiveHeartbeat(ResourceID resourceID, Void payload) {
+					resourceManagerGateway.heartbeatFromJobManager(resourceID);
+				}
+
+				@Override
+				public void requestHeartbeat(ResourceID resourceID, Void payload) {
+					// request heartbeat will never be called on the job manager side
+				}
+			});
 		}
 	}
 
-	private void closeResourceManagerConnection() {
-		validateRunsInMainThread();
-
+	private void closeResourceManagerConnection(Exception cause) {
 		if (resourceManagerConnection != null) {
+			log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerResourceID(), cause);
+
+			resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerResourceID());
+
+			ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+			resourceManagerGateway.disconnectJobManager(resourceManagerConnection.getJobID(), cause);
+
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
 		}
@@ -964,13 +1008,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	{
 		private final JobID jobID;
 
+		private final ResourceID jobManagerResourceID;
+
 		private final String jobManagerRpcAddress;
 
 		private final UUID jobManagerLeaderID;
 
+		private ResourceID resourceManagerResourceID;
+
 		ResourceManagerConnection(
 				final Logger log,
 				final JobID jobID,
+				final ResourceID jobManagerResourceID,
 				final String jobManagerRpcAddress,
 				final UUID jobManagerLeaderID,
 				final String resourceManagerAddress,
@@ -979,6 +1028,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		{
 			super(log, resourceManagerAddress, resourceManagerLeaderID, executor);
 			this.jobID = checkNotNull(jobID);
+			this.jobManagerResourceID = checkNotNull(jobManagerResourceID);
 			this.jobManagerRpcAddress = checkNotNull(jobManagerRpcAddress);
 			this.jobManagerLeaderID = checkNotNull(jobManagerLeaderID);
 		}
@@ -998,6 +1048,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 					return gateway.registerJobManager(
 						leaderId,
 						jobManagerLeaderID,
+						jobManagerResourceID,
 						jobManagerRpcAddress,
 						jobID,
 						timeout);
@@ -1010,7 +1061,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			runAsync(new Runnable() {
 				@Override
 				public void run() {
-					onResourceManagerRegistrationSuccess(success);
+					resourceManagerResourceID = success.getResourceManagerResourceId();
+					establishResourceManagerConnection(success);
 				}
 			});
 		}
@@ -1019,6 +1071,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		protected void onRegistrationFailure(final Throwable failure) {
 			handleFatalError(failure);
 		}
+
+		public ResourceID getResourceManagerResourceID() {
+			return resourceManagerResourceID;
+		}
+
+		public JobID getJobID() {
+			return jobID;
+		}
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -1063,4 +1123,31 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			return FlinkCompletableFuture.completed(null);
 		}
 	}
+
+	private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
+
+		@Override
+		public void notifyHeartbeatTimeout(final ResourceID resourceId) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
+
+					closeResourceManagerConnection(
+						new TimeoutException(
+							"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+				}
+			});
+		}
+
+		@Override
+		public void reportPayload(ResourceID resourceID, Void payload) {
+			// nothing to do since the payload is of type Void
+		}
+
+		@Override
+		public Future<Void> retrievePayload() {
+			return FlinkCompletableFuture.completed(null);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 13a7372..5a271f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -226,4 +226,11 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param resourceID unique id of the task manager
 	 */
 	void heartbeatFromTaskManager(final ResourceID resourceID);
+
+	/**
+	 * Heartbeat request from the resource manager
+	 *
+	 * @param resourceID unique id of the resource manager
+	 */
+	void heartbeatFromResourceManager(final ResourceID resourceID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
index 4058452..a7a6224 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 
 import java.util.UUID;
@@ -35,9 +36,15 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success {
 
 	private final UUID resourceManagerLeaderId;
 
-	public JobMasterRegistrationSuccess(final long heartbeatInterval, final UUID resourceManagerLeaderId) {
+	private final ResourceID resourceManagerResourceId;
+
+	public JobMasterRegistrationSuccess(
+			final long heartbeatInterval,
+			final UUID resourceManagerLeaderId,
+			final ResourceID resourceManagerResourceId) {
 		this.heartbeatInterval = heartbeatInterval;
 		this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
+		this.resourceManagerResourceId = checkNotNull(resourceManagerResourceId);
 	}
 
 	/**
@@ -53,11 +60,16 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success {
 		return resourceManagerLeaderId;
 	}
 
+	public ResourceID getResourceManagerResourceId() {
+		return resourceManagerResourceId;
+	}
+
 	@Override
 	public String toString() {
 		return "JobMasterRegistrationSuccess{" +
 			"heartbeatInterval=" + heartbeatInterval +
 			", resourceManagerLeaderId=" + resourceManagerLeaderId +
+			", resourceManagerResourceId=" + resourceManagerResourceId +
 			'}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c0ff412..f17dbe5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -79,7 +79,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * It offers the following methods as part of its rpc interface to interact with him remotely:
  * <ul>
- *     <li>{@link #registerJobManager(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
+ *     <li>{@link #registerJobManager(UUID, UUID, String, JobID, ResourceID)} registers a {@link JobMaster} at the resource manager</li>
  *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
@@ -96,6 +96,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	/** All currently registered JobMasterGateways scoped by JobID. */
 	private final Map<JobID, JobManagerRegistration> jobManagerRegistrations;
 
+	/** All currently registered JobMasterGateways scoped by ResourceID. */
+	private final Map<ResourceID, JobManagerRegistration> jmResourceIdRegistrations;
+
 	/** Service to retrieve the job leader ids */
 	private final JobLeaderIdService jobLeaderIdService;
 
@@ -108,6 +111,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	/** The heartbeat manager with task managers. */
 	private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
 
+	/** The heartbeat manager with job managers. */
+	private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
+
 	/** The factory to construct the SlotManager. */
 	private final SlotManagerFactory slotManagerFactory;
 
@@ -152,12 +158,19 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 
 		this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
-				resourceId,
-				new TaskManagerHeartbeatListener(),
-				rpcService.getScheduledExecutor(),
-				log);
+			resourceId,
+			new TaskManagerHeartbeatListener(),
+			rpcService.getScheduledExecutor(),
+			log);
+
+		this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
+			resourceId,
+			new JobManagerHeartbeatListener(),
+			rpcService.getScheduledExecutor(),
+			log);
 
 		this.jobManagerRegistrations = new HashMap<>(4);
+		this.jmResourceIdRegistrations = new HashMap<>(4);
 		this.taskExecutors = new HashMap<>(8);
 		this.leaderSessionId = null;
 		infoMessageListeners = new ConcurrentHashMap<>(8);
@@ -204,6 +217,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 		taskManagerHeartbeatManager.stop();
 
+		jobManagerHeartbeatManager.stop();
+
 		try {
 			super.shutDown();
 		} catch (Exception e) {
@@ -231,11 +246,13 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	public Future<RegistrationResponse> registerJobManager(
 			final UUID resourceManagerLeaderId,
 			final UUID jobManagerLeaderId,
+			final ResourceID jobManagerResourceId,
 			final String jobManagerAddress,
 			final JobID jobId) {
 
 		checkNotNull(resourceManagerLeaderId);
 		checkNotNull(jobManagerLeaderId);
+		checkNotNull(jobManagerResourceId);
 		checkNotNull(jobManagerAddress);
 		checkNotNull(jobId);
 
@@ -276,7 +293,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 			Future<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(jobLeaderIdFuture, new BiFunction<JobMasterGateway, UUID, RegistrationResponse>() {
 				@Override
-				public RegistrationResponse apply(JobMasterGateway jobMasterGateway, UUID jobLeaderId) {
+				public RegistrationResponse apply(final JobMasterGateway jobMasterGateway, UUID jobLeaderId) {
 					if (isValid(resourceManagerLeaderId)) {
 						if (jobLeaderId.equals(jobManagerLeaderId)) {
 							if (jobManagerRegistrations.containsKey(jobId)) {
@@ -291,21 +308,43 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 										oldJobManagerRegistration.getJobID(),
 										new Exception("New job leader for job " + jobId + " found."));
 
-									JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobLeaderId, jobMasterGateway);
+									JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(
+										jobId,
+										jobManagerResourceId,
+										jobLeaderId,
+										jobMasterGateway);
 									jobManagerRegistrations.put(jobId, jobManagerRegistration);
+									jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
 								}
 							} else {
 								// new registration for the job
-								JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobLeaderId, jobMasterGateway);
-
+								JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(
+									jobId,
+									jobManagerResourceId,
+									jobLeaderId,
+									jobMasterGateway);
 								jobManagerRegistrations.put(jobId, jobManagerRegistration);
+								jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
 							}
 
 							log.info("Registered job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId);
 
+							jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new HeartbeatTarget<Void>() {
+								@Override
+								public void receiveHeartbeat(ResourceID resourceID, Void payload) {
+									// the ResourceManager will always send heartbeat requests to the JobManager
+								}
+
+								@Override
+								public void requestHeartbeat(ResourceID resourceID, Void payload) {
+									jobMasterGateway.heartbeatFromResourceManager(resourceID);
+								}
+							});
+
 							return new JobMasterRegistrationSuccess(
 								resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
-								getLeaderSessionId());
+								getLeaderSessionId(),
+								resourceId);
 
 						} else {
 							log.debug("The job manager leader id {} did not match the job " +
@@ -423,10 +462,20 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	@RpcMethod
+	public void heartbeatFromJobManager(final ResourceID resourceID) {
+		jobManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
+	}
+
+	@RpcMethod
 	public void disconnectTaskManager(final ResourceID resourceId, final Exception cause) {
 		closeTaskManagerConnection(resourceId, cause);
 	}
 
+	@RpcMethod
+	public void disconnectJobManager(final JobID jobId, final Exception cause) {
+		closeJobManagerConnection(jobId, cause);
+	}
+
 	/**
 	 * Requests a slot from the resource manager.
 	 *
@@ -577,6 +626,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 	private void clearState() {
 		jobManagerRegistrations.clear();
+		jmResourceIdRegistrations.clear();
 		taskExecutors.clear();
 		slotManager.clearState();
 
@@ -590,23 +640,31 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	/**
-	 * Disconnects the job manager which is connected for the given job from the resource manager.
+	 * This method should be called by the framework once it detects that a currently registered
+	 * job manager has failed.
 	 *
-	 * @param jobId identifying the job whose leader shall be disconnected
+	 * @param jobId identifying the job whose leader shall be disconnected.
+	 * @param cause The exception which cause the JobManager failed.
 	 */
-	protected void disconnectJobManager(JobID jobId, Exception cause) {
+	protected void closeJobManagerConnection(JobID jobId, Exception cause) {
 		JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.remove(jobId);
 
 		if (jobManagerRegistration != null) {
+			final ResourceID jobManagerResourceId = jobManagerRegistration.getJobManagerResourceID();
+			final JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway();
+			final UUID jobManagerLeaderId = jobManagerRegistration.getLeaderID();
+
 			log.info("Disconnect job manager {}@{} for job {} from the resource manager.",
-				jobManagerRegistration.getLeaderID(),
-				jobManagerRegistration.getJobManagerGateway().getAddress(),
+				jobManagerLeaderId,
+				jobMasterGateway.getAddress(),
 				jobId);
 
-			JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway();
+			jobManagerHeartbeatManager.unmonitorTarget(jobManagerResourceId);
+
+			jmResourceIdRegistrations.remove(jobManagerResourceId);
 
 			// tell the job manager about the disconnect
-			jobMasterGateway.disconnectResourceManager(jobManagerRegistration.getLeaderID(), getLeaderSessionId(), cause);
+			jobMasterGateway.disconnectResourceManager(jobManagerLeaderId, getLeaderSessionId(), cause);
 		} else {
 			log.debug("There was no registered job manager for job {}.", jobId);
 		}
@@ -882,14 +940,47 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 		@Override
 		public void notifyHeartbeatTimeout(final ResourceID resourceID) {
-			log.info("The heartbeat of TaskManager with id {} timed out.", resourceID);
-
 			runAsync(new Runnable() {
 				@Override
 				public void run() {
+					log.info("The heartbeat of TaskManager with id {} timed out.", resourceID);
+
 					closeTaskManagerConnection(
-						resourceID,
-						new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out."));
+							resourceID,
+							new TimeoutException("The heartbeat of TaskManager with id " + resourceID + "  timed out."));
+				}
+			});
+		}
+
+		@Override
+		public void reportPayload(ResourceID resourceID, Void payload) {
+			// nothing to do since there is no payload
+		}
+
+		@Override
+		public Future<Void> retrievePayload() {
+			return FlinkCompletableFuture.completed(null);
+		}
+	}
+
+	private class JobManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
+
+		@Override
+		public void notifyHeartbeatTimeout(final ResourceID resourceID) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
+
+					if (jmResourceIdRegistrations.containsKey(resourceID)) {
+						JobManagerRegistration jobManagerRegistration = jmResourceIdRegistrations.get(resourceID);
+
+						if (jobManagerRegistration != null) {
+							closeJobManagerConnection(
+								jobManagerRegistration.getJobID(),
+								new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
+						}
+					}
 				}
 			});
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index cda4a7c..530113f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -44,6 +44,7 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 *
 	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
 	 * @param jobMasterLeaderId The fencing token for the JobMaster leader
+	 * @param jobMasterResourceId   The resource ID of the JobMaster that registers
 	 * @param jobMasterAddress        The address of the JobMaster that registers
 	 * @param jobID                   The Job ID of the JobMaster that registers
 	 * @param timeout                 Timeout for the future to complete
@@ -52,11 +53,11 @@ public interface ResourceManagerGateway extends RpcGateway {
 	Future<RegistrationResponse> registerJobManager(
 		UUID resourceManagerLeaderId,
 		UUID jobMasterLeaderId,
+		ResourceID jobMasterResourceId,
 		String jobMasterAddress,
 		JobID jobID,
 		@RpcTimeout Time timeout);
 
-
 	/**
 	 * Requests a slot from the resource manager.
 	 *
@@ -139,10 +140,25 @@ public interface ResourceManagerGateway extends RpcGateway {
 	void heartbeatFromTaskManager(final ResourceID heartbeatOrigin);
 
 	/**
+	 * Sends the heartbeat to resource manager from job manager
+	 *
+	 * @param heartbeatOrigin unique id of the job manager
+	 */
+	void heartbeatFromJobManager(final ResourceID heartbeatOrigin);
+
+	/**
 	 * Disconnects a TaskManager specified by the given resourceID from the {@link ResourceManager}.
 	 *
 	 * @param resourceID identifying the TaskManager to disconnect
 	 * @param cause for the disconnection of the TaskManager
 	 */
 	void disconnectTaskManager(ResourceID resourceID, Exception cause);
+
+	/**
+	 * Disconnects a JobManager specified by the given resourceID from the {@link ResourceManager}.
+	 *
+	 * @param jobId JobID for which the JobManager was the leader
+	 * @param cause for the disconnection of the JobManager
+	 */
+	void disconnectJobManager(JobID jobId, Exception cause);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
index a1deb65..df3a39f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.registration;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.util.Preconditions;
 
@@ -30,15 +31,19 @@ import java.util.UUID;
 public class JobManagerRegistration {
 	private final JobID jobID;
 
+	private final ResourceID jobManagerResourceID;
+
 	private final UUID leaderID;
 
 	private final JobMasterGateway jobManagerGateway;
 
 	public JobManagerRegistration(
 			JobID jobID,
+			ResourceID jobManagerResourceID,
 			UUID leaderID,
 			JobMasterGateway jobManagerGateway) {
 		this.jobID = Preconditions.checkNotNull(jobID);
+		this.jobManagerResourceID = Preconditions.checkNotNull(jobManagerResourceID);
 		this.leaderID = Preconditions.checkNotNull(leaderID);
 		this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway);
 	}
@@ -47,6 +52,10 @@ public class JobManagerRegistration {
 		return jobID;
 	}
 
+	public ResourceID getJobManagerResourceID() {
+		return jobManagerResourceID;
+	}
+
 	public UUID getLeaderID() {
 		return leaderID;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 3464129..e4e20b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -37,7 +38,10 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -412,27 +416,32 @@ public class ResourceManagerTest extends TestLogger {
 
 			final SlotReport slotReport = new SlotReport();
 			// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
-			Future<RegistrationResponse> successfulFuture =
-					resourceManager.registerTaskExecutor(rmLeaderSessionId, taskManagerAddress, taskManagerResourceID, slotReport);
+			Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(
+				rmLeaderSessionId,
+				taskManagerAddress,
+				taskManagerResourceID,
+				slotReport);
 			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 
 			ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
-			verify(scheduledExecutor, times(1)).scheduleAtFixedRate(
+			verify(scheduledExecutor, times(2)).scheduleAtFixedRate(
 				heartbeatRunnableCaptor.capture(),
 				eq(0L),
 				eq(heartbeatInterval),
 				eq(TimeUnit.MILLISECONDS));
 
-			Runnable heartbeatRunnable = heartbeatRunnableCaptor.getValue();
+			List<Runnable> heartbeatRunnable = heartbeatRunnableCaptor.getAllValues();
 
 			ArgumentCaptor<Runnable> timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
 			verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
 
 			Runnable timeoutRunnable = timeoutRunnableCaptor.getValue();
 
-			// run the first heartbeat request
-			heartbeatRunnable.run();
+			// run all the heartbeat requests
+			for (Runnable runnable : heartbeatRunnable) {
+				runnable.run();
+			}
 
 			verify(taskExecutorGateway, times(1)).heartbeatFromResourceManager(eq(resourceManagerResourceID));
 
@@ -445,4 +454,99 @@ public class ResourceManagerTest extends TestLogger {
 			rpcService.stopService();
 		}
 	}
+
+	@Test
+	public void testHeartbeatTimeoutWithJobManager() throws Exception {
+		final String jobMasterAddress = "jm";
+		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
+		final ResourceID rmResourceId = ResourceID.generate();
+		final UUID rmLeaderId = UUID.randomUUID();
+		final UUID jmLeaderId = UUID.randomUUID();
+		final JobID jobId = new JobID();
+
+		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+
+		final TestingSerialRpcService rpcService = new TestingSerialRpcService();
+		rpcService.registerGateway(jobMasterAddress, jobMasterGateway);
+
+		final ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
+			Time.seconds(5L),
+			Time.seconds(5L));
+
+		final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
+		final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderId);
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+		highAvailabilityServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService);
+
+		final long heartbeatInterval = 1L;
+		final long heartbeatTimeout = 5L;
+		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
+
+		final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
+		final MetricRegistry metricRegistry = mock(MetricRegistry.class);
+		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor(),
+			Time.minutes(5L));
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		try {
+			final StandaloneResourceManager resourceManager = new StandaloneResourceManager(
+				rpcService,
+				FlinkResourceManager.RESOURCE_MANAGER_NAME,
+				rmResourceId,
+				resourceManagerConfiguration,
+				highAvailabilityServices,
+				heartbeatServices,
+				slotManagerFactory,
+				metricRegistry,
+				jobLeaderIdService,
+				testingFatalErrorHandler);
+
+			resourceManager.start();
+
+			rmLeaderElectionService.isLeader(rmLeaderId);
+
+			// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
+			Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
+				rmLeaderId,
+				jmLeaderId,
+				jmResourceId,
+				jobMasterAddress,
+				jobId);
+			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
+			assertTrue(response instanceof JobMasterRegistrationSuccess);
+
+			ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+			verify(scheduledExecutor, times(2)).scheduleAtFixedRate(
+				heartbeatRunnableCaptor.capture(),
+				eq(0L),
+				eq(heartbeatInterval),
+				eq(TimeUnit.MILLISECONDS));
+
+			List<Runnable> heartbeatRunnable = heartbeatRunnableCaptor.getAllValues();
+
+			ArgumentCaptor<Runnable> timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+			verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
+
+			Runnable timeoutRunnable = timeoutRunnableCaptor.getValue();
+
+			// run all the heartbeat requests
+			for (Runnable runnable : heartbeatRunnable) {
+				runnable.run();
+			}
+
+			verify(jobMasterGateway, times(1)).heartbeatFromResourceManager(eq(rmResourceId));
+
+			// run the timeout runnable to simulate a heartbeat timeout
+			timeoutRunnable.run();
+
+			verify(jobMasterGateway).disconnectResourceManager(eq(jmLeaderId), eq(rmLeaderId), any(TimeoutException.class));
+
+		} finally {
+			rpcService.stopService();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 8b9b800..0b25e6c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -32,6 +34,8 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -52,9 +56,8 @@ import java.util.concurrent.TimeoutException;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(BlobLibraryCacheManager.class)
@@ -144,5 +147,85 @@ public class JobMasterTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testHeartbeatTimeoutWithResourceManager() throws Exception {
+		final String resourceManagerAddress = "rm";
+		final String jobManagerAddress = "jm";
+		final UUID rmLeaderId = UUID.randomUUID();
+		final UUID jmLeaderId = UUID.randomUUID();
+		final ResourceID rmResourceId = new ResourceID(resourceManagerAddress);
+		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
+		final JobGraph jobGraph = new JobGraph();
+
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(
+			null,
+			null);
+		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+		haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
+
+		final long heartbeatInterval = 1L;
+		final long heartbeatTimeout = 5L;
+		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
+
+		final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
+		when(resourceManagerGateway.registerJobManager(
+			any(UUID.class),
+			any(UUID.class),
+			any(ResourceID.class),
+			anyString(),
+			any(JobID.class),
+			any(Time.class)
+		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JobMasterRegistrationSuccess(
+			heartbeatInterval, rmLeaderId, rmResourceId)));
+
+		final TestingSerialRpcService rpc = new TestingSerialRpcService();
+		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
+
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		try {
+			final JobMaster jobMaster = new JobMaster(
+				rpc,
+				jmResourceId,
+				jobGraph,
+				new Configuration(),
+				haServices,
+				heartbeatServices,
+				Executors.newScheduledThreadPool(1),
+				mock(BlobLibraryCacheManager.class),
+				mock(RestartStrategyFactory.class),
+				Time.of(10, TimeUnit.SECONDS),
+				null,
+				mock(OnCompletionActions.class),
+				testingFatalErrorHandler,
+				new FlinkUserCodeClassLoader(new URL[0]));
+
+			jobMaster.start(jmLeaderId);
+
+			// define a leader and see that a registration happens
+			rmLeaderRetrievalService.notifyListener(resourceManagerAddress, rmLeaderId);
+
+			// register job manager success will trigger monitor heartbeat target between jm and rm
+			verify(resourceManagerGateway).registerJobManager(
+				eq(rmLeaderId),
+				eq(jmLeaderId),
+				eq(jmResourceId),
+				anyString(),
+				eq(jobGraph.getJobID()),
+				any(Time.class));
+
+			// heartbeat timeout should trigger disconnect JobManager from ResourceManager
+			verify(resourceManagerGateway, timeout(heartbeatTimeout * 50L)).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class));
+
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowError();
+
+		} finally {
+			rpc.stopService();
+		}
+	}
+
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index b6b8614..6a151ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -70,13 +70,19 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		JobID jobID = mockJobMaster(jobMasterAddress);
 		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
 		UUID jmLeaderID = UUID.randomUUID();
+		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
 		// test response successful
-		Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(rmLeaderSessionId, jmLeaderID, jobMasterAddress, jobID);
+		Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
+			rmLeaderSessionId,
+			jmLeaderID,
+			jmResourceId,
+			jobMasterAddress,
+			jobID);
 		RegistrationResponse response = successfulFuture.get(5L, TimeUnit.SECONDS);
 		assertTrue(response instanceof JobMasterRegistrationSuccess);
 
@@ -94,6 +100,7 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		JobID jobID = mockJobMaster(jobMasterAddress);
 		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
 		UUID jmLeaderID = UUID.randomUUID();
+		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
@@ -101,7 +108,12 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 
 		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
-		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(differentLeaderSessionID, jmLeaderID, jobMasterAddress, jobID);
+		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
+			differentLeaderSessionID,
+			jmLeaderID,
+			jmResourceId,
+			jobMasterAddress,
+			jobID);
 		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -124,10 +136,16 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
-		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(rmLeaderSessionId, differentLeaderSessionID, jobMasterAddress, jobID);
+		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
+			rmLeaderSessionId,
+			differentLeaderSessionID,
+			jmResourceId,
+			jobMasterAddress,
+			jobID);
 		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -150,10 +168,16 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		// test throw exception when receive a registration from job master which takes invalid address
 		String invalidAddress = "/jobMasterAddress2";
-		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager(rmLeaderSessionId, jmLeaderSessionId, invalidAddress, jobID);
+		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager(
+			rmLeaderSessionId,
+			jmLeaderSessionId,
+			jmResourceId,
+			invalidAddress,
+			jobID);
 		assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -176,10 +200,16 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		JobID unknownJobIDToHAServices = new JobID();
 		// verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
-		Future<RegistrationResponse> declineFuture = resourceManager.registerJobManager(rmLeaderSessionId, jmLeaderSessionId, jobMasterAddress, unknownJobIDToHAServices);
+		Future<RegistrationResponse> declineFuture = resourceManager.registerJobManager(
+			rmLeaderSessionId,
+			jmLeaderSessionId,
+			jmResourceId,
+			jobMasterAddress,
+			unknownJobIDToHAServices);
 		RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS);
 		assertTrue(response instanceof RegistrationResponse.Decline);
 
@@ -205,7 +235,7 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
 		highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
 
-		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+		HeartbeatServices heartbeatServices = new HeartbeatServices(5L, 5L);
 
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
 			Time.seconds(5L),

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 4d2309a..37690b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -101,6 +101,7 @@ public class SlotProtocolTest extends TestLogger {
 		final String jmAddress = "/jm1";
 		final JobID jobID = new JobID();
 		final ResourceID rmResourceId = new ResourceID(rmAddress);
+		final ResourceID jmResourceId = new ResourceID(jmAddress);
 
 		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
 
@@ -138,7 +139,7 @@ public class SlotProtocolTest extends TestLogger {
 		rmLeaderElectionService.isLeader(rmLeaderID);
 
 		Future<RegistrationResponse> registrationFuture =
-			resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmAddress, jobID);
+			resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmResourceId, jmAddress, jobID);
 		try {
 			registrationFuture.get(5, TimeUnit.SECONDS);
 		} catch (Exception e) {
@@ -207,6 +208,7 @@ public class SlotProtocolTest extends TestLogger {
 		final String tmAddress = "/tm1";
 		final JobID jobID = new JobID();
 		final ResourceID rmResourceId = new ResourceID(rmAddress);
+		final ResourceID jmResourceId = new ResourceID(jmAddress);
 
 		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
 
@@ -254,7 +256,7 @@ public class SlotProtocolTest extends TestLogger {
 		Thread.sleep(1000);
 
 		Future<RegistrationResponse> registrationFuture =
-			resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmAddress, jobID);
+			resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmResourceId, jmAddress, jobID);
 		try {
 			registrationFuture.get(5L, TimeUnit.SECONDS);
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index fa326b5..579ca3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -90,6 +90,7 @@ public class TaskExecutorITCase extends TestLogger {
 		final String jmAddress = "jm";
 		final UUID jmLeaderId = UUID.randomUUID();
 		final ResourceID rmResourceId = new ResourceID(rmAddress);
+		final ResourceID jmResourceId = new ResourceID(jmAddress);
 		final JobID jobId = new JobID();
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 
@@ -180,7 +181,12 @@ public class TaskExecutorITCase extends TestLogger {
 			// notify the TM about the new RM leader
 			rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId);
 
-			Future<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager(rmLeaderId, jmLeaderId, jmAddress, jobId);
+			Future<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager(
+				rmLeaderId,
+				jmLeaderId,
+				jmResourceId,
+				jmAddress,
+				jobId);
 
 			RegistrationResponse registrationResponse = registrationResponseFuture.get();
 


Mime
View raw message