flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-4364] [heartbeats] Implement TaskManager side of heartbeat from JobManager
Date Fri, 17 Mar 2017 16:00:23 GMT
Repository: flink
Updated Branches:
  refs/heads/master 521a53d9a -> 97ccc1473


[FLINK-4364] [heartbeats] Implement TaskManager side of heartbeat from JobManager

This closes #3151.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b3d5c27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b3d5c27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b3d5c27

Branch: refs/heads/master
Commit: 0b3d5c27f4ab7b2dffb37160a1f01cb822bb696e
Parents: 521a53d
Author: 淘江 <taojiang.wzj@alibaba-inc.com>
Authored: Wed Jan 18 19:08:19 2017 +0800
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Mar 17 17:00:04 2017 +0100

----------------------------------------------------------------------
 .../configuration/HeartbeatManagerOptions.java  |  45 ++++++
 .../runtime/heartbeat/HeartbeatManagerImpl.java |  18 +--
 .../heartbeat/HeartbeatManagerSenderImpl.java   |  16 +-
 .../heartbeat/TestingHeartbeatManagerImpl.java  |  62 ++++++++
 .../TestingHeartbeatManagerSenderImpl.java      |  60 ++++++++
 .../runtime/jobmaster/JobManagerRunner.java     |  15 ++
 .../flink/runtime/jobmaster/JobMaster.java      |  62 +++++++-
 .../runtime/jobmaster/JobMasterGateway.java     |   7 +
 .../taskexecutor/JobManagerConnection.java      |  20 +++
 .../runtime/taskexecutor/TaskExecutor.java      |  92 +++++++++++-
 .../taskexecutor/TaskExecutorGateway.java       |   8 +
 .../runtime/taskexecutor/TaskManagerRunner.java |  10 ++
 .../runtime/heartbeat/HeartbeatManagerTest.java |  22 +--
 .../flink/runtime/jobmaster/JobMasterTest.java  | 146 +++++++++++++++++++
 .../taskexecutor/TaskExecutorITCase.java        |   3 +
 .../runtime/taskexecutor/TaskExecutorTest.java  | 127 +++++++++++++++-
 16 files changed, 669 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
new file mode 100644
index 0000000..2258eb1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to heartbeat manager settings.
+ */
+@PublicEvolving
+public class HeartbeatManagerOptions {
+
+	/** Time interval for requesting heartbeat from sender side */
+	public static final ConfigOption<Long> HEARTBEAT_INTERVAL =
+			key("heartbeat.sender.interval")
+			.defaultValue(10000L);
+
+	/** Timeout for requesting and receiving heartbeat for both sender and receiver sides */
+	public static final ConfigOption<Long> HEARTBEAT_TIMEOUT =
+			key("heartbeat.timeout")
+			.defaultValue(50000L);
+
+	// ------------------------------------------------------------------------
+
+	/** Not intended to be instantiated */
+	private HeartbeatManagerOptions() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 42b1c85..9860b4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.heartbeat;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
@@ -28,7 +29,6 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -56,7 +56,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 	private final HeartbeatListener<I, O> heartbeatListener;
 
 	/** Executor service used to run heartbeat timeout notifications */
-	private final ScheduledExecutorService scheduledExecutorService;
+	private final ScheduledExecutor scheduledExecutor;
 
 	protected final Logger log;
 
@@ -74,14 +74,14 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 			ResourceID ownResourceID,
 			HeartbeatListener<I, O> heartbeatListener,
 			Executor executor,
-			ScheduledExecutorService scheduledExecutorService,
+			ScheduledExecutor scheduledExecutor,
 			Logger log) {
 		Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout has to be larger than 0.");
 
 		this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
 		this.ownResourceID = Preconditions.checkNotNull(ownResourceID);
 		this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener, "heartbeatListener");
-		this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
+		this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
 		this.log = Preconditions.checkNotNull(log);
 		this.executor = Preconditions.checkNotNull(executor);
 		this.heartbeatTargets = new ConcurrentHashMap<>(16);
@@ -122,7 +122,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 				HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
 					resourceID,
 					heartbeatTarget,
-					scheduledExecutorService,
+					scheduledExecutor,
 					heartbeatListener,
 					heartbeatTimeoutIntervalMs);
 
@@ -236,7 +236,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 		/** Associated heartbeat target */
 		private final HeartbeatTarget<O> heartbeatTarget;
 
-		private final ScheduledExecutorService scheduledExecutorService;
+		private final ScheduledExecutor scheduledExecutor;
 
 		/** Listener which is notified about heartbeat timeouts */
 		private final HeartbeatListener<?, ?> heartbeatListener;
@@ -251,13 +251,13 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 		HeartbeatMonitor(
 			ResourceID resourceID,
 			HeartbeatTarget<O> heartbeatTarget,
-			ScheduledExecutorService scheduledExecutorService,
+			ScheduledExecutor scheduledExecutor,
 			HeartbeatListener<?, O> heartbeatListener,
 			long heartbeatTimeoutIntervalMs) {
 
 			this.resourceID = Preconditions.checkNotNull(resourceID);
 			this.heartbeatTarget = Preconditions.checkNotNull(heartbeatTarget);
-			this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
+			this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
 			this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener);
 
 			Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat timeout interval has to be larger than 0.");
@@ -278,7 +278,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 			if (state.get() == State.RUNNING) {
 				cancelTimeout();
 
-				futureTimeout = scheduledExecutorService.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);
+				futureTimeout = scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);
 
 				// Double check for concurrent accesses (e.g. a firing of the scheduled future)
 				if (state.get() != State.RUNNING) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
index 57c8671..32f8aa3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.heartbeat;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.slf4j.Logger;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
  * its monitored {@link HeartbeatTarget}. The heartbeat period is configurable.
  *
  * @param <I> Type of the incoming heartbeat payload
- * @param <O> Type of the outgoind heartbeat payload
+ * @param <O> Type of the outgoing heartbeat payload
  */
 public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
 
@@ -44,18 +44,18 @@ public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O>
 			long heartbeatTimeout,
 			ResourceID ownResourceID,
 			HeartbeatListener<I, O> heartbeatListener,
-			ExecutorService executorService,
-			ScheduledExecutorService scheduledExecutorService,
+			Executor executor,
+			ScheduledExecutor scheduledExecutor,
 			Logger log) {
 		super(
 			heartbeatTimeout,
 			ownResourceID,
 			heartbeatListener,
-			executorService,
-			scheduledExecutorService,
+			executor,
+			scheduledExecutor,
 			log);
 
-		triggerFuture = scheduledExecutorService.scheduleAtFixedRate(this, 0L, heartbeatPeriod, TimeUnit.MILLISECONDS);
+		triggerFuture = scheduledExecutor.scheduleAtFixedRate(this, 0L, heartbeatPeriod, TimeUnit.MILLISECONDS);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
new file mode 100644
index 0000000..1238f1a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.slf4j.Logger;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
+/**
+ * Heartbeat manager implementation which extends {@link HeartbeatManagerImpl} for testing.
+ * It overrides the {@link #unmonitorTarget(ResourceID)} to wait for some tests complete
+ * when notify heartbeat timeout.
+ *
+ * @param <I> Type of the incoming heartbeat payload
+ * @param <O> Type of the outgoing heartbeat payload
+ */
+public class TestingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O> {
+
+	private final CountDownLatch waitLatch;
+
+	public TestingHeartbeatManagerImpl(
+			CountDownLatch waitLatch,
+			long heartbeatTimeoutIntervalMs,
+			ResourceID ownResourceID,
+			Executor executor,
+			ScheduledExecutor scheduledExecutor,
+			Logger log) {
+
+		super(heartbeatTimeoutIntervalMs, ownResourceID, executor, scheduledExecutor, log);
+
+		this.waitLatch = waitLatch;
+	}
+
+	@Override
+	public void unmonitorTarget(ResourceID resourceID) {
+		try {
+			waitLatch.await();
+		} catch (InterruptedException ex) {
+			log.error("Unexpected interrupted exception.", ex);
+		}
+
+		super.unmonitorTarget(resourceID);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
new file mode 100644
index 0000000..7000895
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.slf4j.Logger;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ * @param <I>
+ * @param <O>
+ */
+public class TestingHeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerSenderImpl<I, O> {
+
+	private final CountDownLatch waitLatch;
+
+	public TestingHeartbeatManagerSenderImpl(
+			CountDownLatch waitLatch,
+			long heartbeatPeriod,
+			long heartbeatTimeout,
+			ResourceID ownResourceID,
+			Executor executor,
+			ScheduledExecutor scheduledExecutor,
+			Logger log) {
+
+		super(heartbeatPeriod, heartbeatTimeout, ownResourceID, executor, scheduledExecutor, log);
+
+		this.waitLatch = waitLatch;
+	}
+
+	@Override
+	public void unmonitorTarget(ResourceID resourceID) {
+		try {
+			waitLatch.await();
+		} catch (InterruptedException ex) {
+			log.error("Unexpected interrupted exception.", ex);
+		}
+
+		super.unmonitorTarget(resourceID);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 6e02813..eced869 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,8 +21,11 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
@@ -167,6 +170,16 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			this.runningJobsRegistry = haServices.getRunningJobsRegistry();
 			this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
 
+			// heartbeat manager last
+			final ResourceID resourceID = ResourceID.generate();
+			final HeartbeatManagerSenderImpl<Void, Void> jobManagerHeartbeatManager = new HeartbeatManagerSenderImpl<>(
+					configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL),
+					configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT),
+					resourceID,
+					rpcService.getExecutor(),
+					rpcService.getScheduledExecutor(),
+					log);
+
 			// now start the JobManager
 			this.jobManager = new JobMaster(
 					jobGraph, configuration,
@@ -177,6 +190,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 					jobManagerServices.restartStrategyFactory,
 					jobManagerServices.rpcAskTimeout,
 					jobManagerMetrics,
+					resourceID,
+					jobManagerHeartbeatManager,
 					this,
 					this,
 					userCodeLoader);

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 145216d..16c243c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -50,6 +50,9 @@ import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatListener;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.Slot;
@@ -146,6 +149,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** The metrics for the job */
 	private final MetricGroup jobMetricGroup;
 
+	/** The heartbeat manager with task managers */
+	private final HeartbeatManagerImpl<Void, Void> heartbeatManager;
+
 	/** The execution context which is used to execute futures */
 	private final ExecutorService executionContext;
 
@@ -164,6 +170,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	private volatile UUID leaderSessionID;
 
+	private final ResourceID resourceID;
+
 	// --------- ResourceManager --------
 
 	/** Leader retriever service used to locate ResourceManager's address */
@@ -188,6 +196,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			RestartStrategyFactory restartStrategyFactory,
 			Time rpcAskTimeout,
 			@Nullable JobManagerMetricGroup jobManagerMetricGroup,
+			ResourceID resourceID,
+			HeartbeatManagerImpl<Void, Void> heartbeatManager,
 			OnCompletionActions jobCompletionActions,
 			FatalErrorHandler errorHandler,
 			ClassLoader userCodeLoader) throws Exception
@@ -203,6 +213,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		this.jobCompletionActions = checkNotNull(jobCompletionActions);
 		this.errorHandler = checkNotNull(errorHandler);
 		this.userCodeLoader = checkNotNull(userCodeLoader);
+		this.resourceID = checkNotNull(resourceID);
+		this.heartbeatManager = checkNotNull(heartbeatManager);
 
 		final String jobName = jobGraph.getName();
 		final JobID jid = jobGraph.getJobID();
@@ -276,6 +288,27 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			// make sure we receive RPC and async calls
 			super.start();
 
+			heartbeatManager.start(new HeartbeatListener<Void, Void>() {
+				@Override
+				public void notifyHeartbeatTimeout(ResourceID resourceID) {
+					log.info("Notify heartbeat timeout with task manager {}", resourceID);
+					heartbeatManager.unmonitorTarget(resourceID);
+
+					getSelf().disconnectTaskManager(resourceID);
+				}
+
+				@Override
+				public void reportPayload(ResourceID resourceID, Void payload) {
+					// currently there is no payload from task manager and resource manager, so this method will not be called.
+				}
+
+				@Override
+				public Future<Void> retrievePayload() {
+					// currently no need payload.
+					return null;
+				}
+			});
+
 			log.info("JobManager started as leader {} for job {}", leaderSessionID, jobGraph.getJobID());
 			getSelf().startJobExecution();
 		}
@@ -290,6 +323,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	@Override
 	public void shutDown() throws Exception {
 		// make sure there is a graceful exit
+		heartbeatManager.stop();
 		getSelf().suspendExecution(new Exception("JobManager is shutting down."));
 		super.shutDown();
 	}
@@ -512,7 +546,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	@RpcMethod
 	public void disconnectTaskManager(final ResourceID resourceID) {
-		throw new UnsupportedOperationException();
+		registeredTaskManagers.remove(resourceID);
+		slotPoolGateway.releaseTaskManager(resourceID);
 	}
 
 	// TODO: This method needs a leader session ID
@@ -708,7 +743,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		if (registeredTaskManagers.containsKey(taskManagerId)) {
 			final RegistrationResponse response = new JMTMRegistrationSuccess(
-					taskManagerId, libraryCacheManager.getBlobServerPort());
+					resourceID, libraryCacheManager.getBlobServerPort());
 			return FlinkCompletableFuture.completed(response);
 		} else {
 			return getRpcService().execute(new Callable<TaskExecutorGateway>() {
@@ -719,7 +754,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				}
 			}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
 				@Override
-				public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+				public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
 					if (throwable != null) {
 						return new RegistrationResponse.Decline(throwable.getMessage());
 					}
@@ -734,7 +769,21 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 					slotPoolGateway.registerTaskManager(taskManagerId);
 					registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
-					return new JMTMRegistrationSuccess(taskManagerId, libraryCacheManager.getBlobServerPort());
+
+					// monitor the task manager as heartbeat target
+					heartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
+						@Override
+						public void sendHeartbeat(ResourceID resourceID, Void payload) {
+							// the task manager will not request heartbeat, so this method will never be called currently
+						}
+
+						@Override
+						public void requestHeartbeat(ResourceID resourceID, Void payload) {
+							taskExecutorGateway.heartbeatFromJobManager(resourceID);
+						}
+					});
+
+					return new JMTMRegistrationSuccess(resourceID, libraryCacheManager.getBlobServerPort());
 				}
 			}, getMainThreadExecutor());
 		}
@@ -748,6 +797,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		// TODO: Implement disconnect behaviour
 	}
 
+	@RpcMethod
+	public void heartbeatFromTaskManager(final ResourceID resourceID) {
+		heartbeatManager.sendHeartbeat(resourceID, null);
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Internal methods
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index de7646b..e7e3111 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -218,4 +218,11 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 			final TaskManagerLocation taskManagerLocation,
 			final UUID leaderId,
 			@RpcTimeout final Time timeout);
+
+	/**
+	 * Send the heartbeat to job manager from task manager
+	 *
+	 * @param resourceID unique id of the task manager
+	 */
+	void heartbeatFromTaskManager(final ResourceID resourceID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
index 2b224bc..98c7bf1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -33,6 +35,12 @@ import java.util.UUID;
  */
 public class JobManagerConnection {
 
+	// Job id related with the job manager
+	private final JobID jobID;
+
+	// The unique id used for identifying the job manager
+	private final ResourceID resourceID;
+
 	// Job master leader session id
 	private final UUID leaderId;
 
@@ -55,6 +63,8 @@ public class JobManagerConnection {
 	private final PartitionProducerStateChecker partitionStateChecker;
 
 	public JobManagerConnection(
+		JobID jobID,
+		ResourceID resourceID,
 		JobMasterGateway jobMasterGateway,
 		UUID leaderId,
 		TaskManagerActions taskManagerActions,
@@ -62,6 +72,8 @@ public class JobManagerConnection {
 		LibraryCacheManager libraryCacheManager,
 		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
 		PartitionProducerStateChecker partitionStateChecker) {
+		this.jobID = Preconditions.checkNotNull(jobID);
+		this.resourceID = Preconditions.checkNotNull(resourceID);
 		this.leaderId = Preconditions.checkNotNull(leaderId);
 		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
 		this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions);
@@ -71,6 +83,14 @@ public class JobManagerConnection {
 		this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
 	}
 
+	public JobID getJobID() {
+		return jobID;
+	}
+
+	public ResourceID getResourceID() {
+		return resourceID;
+	}
+
 	public UUID getLeaderId() {
 		return leaderId;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index df5765a..e601b0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -38,6 +38,9 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.heartbeat.HeartbeatListener;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -127,6 +130,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	/** The metric registry in the task manager */
 	private final MetricRegistry metricRegistry;
 
+	/** The heartbeat manager for job manager in the task manager */
+	private final HeartbeatManagerImpl<Void, Void> jobManagerHeartbeatManager;
+
 	/** The fatal error handler to use in case of a fatal error */
 	private final FatalErrorHandler fatalErrorHandler;
 
@@ -163,6 +169,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		NetworkEnvironment networkEnvironment,
 		HighAvailabilityServices haServices,
 		MetricRegistry metricRegistry,
+		HeartbeatManagerImpl<Void, Void> jobManagerHeartbeatManager,
 		TaskManagerMetricGroup taskManagerMetricGroup,
 		BroadcastVariableManager broadcastVariableManager,
 		FileCache fileCache,
@@ -182,6 +189,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		this.networkEnvironment = checkNotNull(networkEnvironment);
 		this.haServices = checkNotNull(haServices);
 		this.metricRegistry = checkNotNull(metricRegistry);
+		this.jobManagerHeartbeatManager = checkNotNull(jobManagerHeartbeatManager);
 		this.taskSlotTable = checkNotNull(taskSlotTable);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 		this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
@@ -213,6 +221,38 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		// start the job leader service
 		jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
+
+		// start the heartbeat manager for monitoring job manager
+		jobManagerHeartbeatManager.start(new HeartbeatListener<Void, Void>() {
+			@Override
+			public void notifyHeartbeatTimeout(final ResourceID resourceID) {
+				runAsync(new Runnable() {
+					@Override
+					public void run() {
+						log.info("Notify heartbeat timeout with job manager {}", resourceID);
+						jobManagerHeartbeatManager.unmonitorTarget(resourceID);
+
+						if (jobManagerConnections.containsKey(resourceID)) {
+							JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
+							if (jobManagerConnection != null) {
+								closeJobManagerConnection(jobManagerConnection.getJobID());
+							}
+						}
+					}
+				});
+			}
+
+			@Override
+			public void reportPayload(ResourceID resourceID, Void payload) {
+				// currently there is no payload from job manager, so this method will not be called.
+			}
+
+			@Override
+			public Future<Void> retrievePayload() {
+				// currently no need payload.
+				return null;
+			}
+		});
 	}
 
 	/**
@@ -230,6 +270,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			resourceManagerConnection.close();
 		}
 
+		jobManagerHeartbeatManager.stop();
+
 		ioManager.shutdown();
 
 		memoryManager.shutdown();
@@ -472,6 +514,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	}
 
 	// ----------------------------------------------------------------------
+	// Heartbeat RPC
+	// ----------------------------------------------------------------------
+
+	@RpcMethod
+	public void heartbeatFromJobManager(ResourceID resourceID) {
+		jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
+	}
+
+	// ----------------------------------------------------------------------
 	// Checkpointing RPCs
 	// ----------------------------------------------------------------------
 
@@ -729,20 +780,39 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
-	private void establishJobManagerConnection(JobID jobId, JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, JMTMRegistrationSuccess registrationSuccess) {
+	private void establishJobManagerConnection(JobID jobId, final JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, JMTMRegistrationSuccess registrationSuccess) {
 		log.info("Establish JobManager connection for job {}.", jobId);
 
 		if (jobManagerTable.contains(jobId)) {
 			JobManagerConnection oldJobManagerConnection = jobManagerTable.get(jobId);
-
 			if (!oldJobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) {
 				closeJobManagerConnection(jobId);
-				jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort()));
 			}
-		} else {
-			jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort()));
 		}
 
+		ResourceID jobManagerResourceID = registrationSuccess.getResourceID();
+		JobManagerConnection newJobManagerConnection = associateWithJobManager(
+				jobId,
+				jobManagerResourceID,
+				jobMasterGateway,
+				jobManagerLeaderId,
+				registrationSuccess.getBlobPort());
+		jobManagerConnections.put(jobManagerResourceID, newJobManagerConnection);
+		jobManagerTable.put(jobId, newJobManagerConnection);
+
+		// monitor the job manager as heartbeat target
+		jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<Void>() {
+			@Override
+			public void sendHeartbeat(ResourceID resourceID, Void payload) {
+				jobMasterGateway.heartbeatFromTaskManager(resourceID);
+			}
+
+			@Override
+			public void requestHeartbeat(ResourceID resourceID, Void payload) {
+				// request heartbeat will never be called in task manager side
+			}
+		});
+
 		offerSlotsToJobManager(jobId);
 	}
 
@@ -777,6 +847,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		if (jobManagerConnection != null) {
 			try {
+				jobManagerConnections.remove(jobManagerConnection.getResourceID());
 				disassociateFromJobManager(jobManagerConnection);
 			} catch (IOException e) {
 				log.warn("Could not properly disassociate from JobManager {}.",
@@ -785,7 +856,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
-	private JobManagerConnection associateWithJobManager(JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, int blobPort) {
+	private JobManagerConnection associateWithJobManager(
+			JobID jobID,
+			ResourceID resourceID,
+			JobMasterGateway jobMasterGateway,
+			UUID jobManagerLeaderId,
+			int blobPort) {
+		Preconditions.checkNotNull(jobID);
+		Preconditions.checkNotNull(resourceID);
 		Preconditions.checkNotNull(jobManagerLeaderId);
 		Preconditions.checkNotNull(jobMasterGateway);
 		Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob server port is out of range.");
@@ -820,6 +898,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		PartitionProducerStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobManagerLeaderId, jobMasterGateway);
 
 		return new JobManagerConnection(
+			jobID,
+			resourceID,
 			jobMasterGateway,
 			jobManagerLeaderId,
 			taskManagerActions,

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 36a3255..95db932 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
@@ -131,4 +132,11 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @return Future acknowledge if the task is successfully canceled
 	 */
 	Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
+
+	/**
+	 * Request heartbeat from the job manager
+	 *
+	 * @param resourceID unique id of the job manager
+	 */
+	void heartbeatFromJobManager(ResourceID resourceID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 3500f6d..402421c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -112,6 +114,13 @@ public class TaskManagerRunner implements FatalErrorHandler {
 		// Initialize the TM metrics
 		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment());
 
+		HeartbeatManagerImpl<Void, Void> heartbeatManager = new HeartbeatManagerImpl<>(
+				configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT),
+				resourceID,
+				executor,
+				rpcService.getScheduledExecutor(),
+				LOG);
+
 		this.taskManager = new TaskExecutor(
 			taskManagerConfiguration,
 			taskManagerServices.getTaskManagerLocation(),
@@ -121,6 +130,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			taskManagerServices.getNetworkEnvironment(),
 			highAvailabilityServices,
 			metricRegistry,
+			heartbeatManager,
 			taskManagerMetricGroup,
 			taskManagerServices.getBroadcastVariableManager(),
 			taskManagerServices.getFileCache(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
index 0a8923d..3da18aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.heartbeat;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.util.TestLogger;
@@ -63,7 +65,7 @@ public class HeartbeatManagerTest extends TestLogger {
 		ResourceID ownResourceID = new ResourceID("foobar");
 		ResourceID targetResourceID = new ResourceID("barfoo");
 		HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);
-		ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+		ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
 
 		Object expectedObject = new Object();
 
@@ -74,7 +76,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			ownResourceID,
 			heartbeatListener,
 			new DirectExecutorService(),
-			scheduledExecutorService,
+			scheduledExecutor,
 			LOG);
 
 		HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class);
@@ -101,10 +103,10 @@ public class HeartbeatManagerTest extends TestLogger {
 		ResourceID ownResourceID = new ResourceID("foobar");
 		ResourceID targetResourceID = new ResourceID("barfoo");
 		HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);
-		ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+		ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
 		ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
 
-		doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+		doReturn(scheduledFuture).when(scheduledExecutor).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
 
 		Object expectedObject = new Object();
 
@@ -115,7 +117,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			ownResourceID,
 			heartbeatListener,
 			new DirectExecutorService(),
-			scheduledExecutorService,
+			scheduledExecutor,
 			LOG);
 
 		HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class);
@@ -125,7 +127,7 @@ public class HeartbeatManagerTest extends TestLogger {
 		heartbeatManager.receiveHeartbeat(targetResourceID, expectedObject);
 
 		verify(scheduledFuture, times(1)).cancel(true);
-		verify(scheduledExecutorService, times(2)).schedule(any(Runnable.class), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
+		verify(scheduledExecutor, times(2)).schedule(any(Runnable.class), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
 	}
 
 	/**
@@ -155,7 +157,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			ownResourceID,
 			heartbeatListener,
 			new DirectExecutorService(),
-			new ScheduledThreadPoolExecutor(1),
+			new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
 			LOG);
 
 		HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class);
@@ -205,7 +207,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			resourceID,
 			heartbeatListener,
 			new DirectExecutorService(),
-			new ScheduledThreadPoolExecutor(1),
+			new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
 			LOG);
 
 		HeartbeatManagerSenderImpl<Object, Object> heartbeatManager2 = new HeartbeatManagerSenderImpl<>(
@@ -214,7 +216,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			resourceID2,
 			heartbeatListener2,
 			new DirectExecutorService(),
-			new ScheduledThreadPoolExecutor(1),
+			new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
 			LOG);;
 
 		heartbeatManager.monitorTarget(resourceID2, heartbeatManager2);
@@ -254,7 +256,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			resourceID,
 			heartbeatListener,
 			new DirectExecutorService(),
-			new ScheduledThreadPoolExecutor(1),
+			new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
 			LOG);
 
 		heartbeatManager.monitorTarget(targetID, mock(HeartbeatTarget.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
new file mode 100644
index 0000000..cdad87f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatManagerSenderImpl;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.SlotPoolGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(BlobLibraryCacheManager.class)
+public class JobMasterTest extends TestLogger {
+
+	@Test
+	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
+		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+		haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		final String jobManagerAddress = "jm";
+		final UUID jmLeaderId = UUID.randomUUID();
+		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
+
+		final String taskManagerAddress = "tm";
+		final ResourceID tmResourceId = new ResourceID(taskManagerAddress);
+		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+
+		final TestingSerialRpcService rpc = new TestingSerialRpcService();
+		rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
+
+		final long heartbeatInterval = 1L;
+		final long heartbeatTimeout = 5L;
+		final CountDownLatch waitLatch = new CountDownLatch(1);
+		final HeartbeatManagerSenderImpl<Void, Void> jmHeartbeatManager = new TestingHeartbeatManagerSenderImpl<>(
+				waitLatch,
+				heartbeatInterval,
+				heartbeatTimeout,
+				jmResourceId,
+				rpc.getExecutor(),
+				rpc.getScheduledExecutor(),
+				log);
+
+		try {
+			final JobMaster jobMaster = new JobMaster(
+					new JobGraph(),
+					new Configuration(),
+					rpc,
+					haServices,
+					Executors.newScheduledThreadPool(1),
+					mock(BlobLibraryCacheManager.class),
+					mock(RestartStrategyFactory.class),
+					Time.of(10, TimeUnit.SECONDS),
+					null,
+					jmResourceId,
+					jmHeartbeatManager,
+					mock(OnCompletionActions.class),
+					testingFatalErrorHandler,
+					new FlinkUserCodeClassLoader(new URL[0]));
+
+			// also start the heartbeat manager in job manager
+			jobMaster.start(jmLeaderId);
+
+			// register task manager will trigger monitoring heartbeat target, schedule heartbeat request in interval time
+			jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId);
+
+			verify(taskExecutorGateway, atLeast(1)).heartbeatFromJobManager(eq(jmResourceId));
+
+			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(jmHeartbeatManager, "heartbeatTargets");
+			final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTMsInJM = Whitebox.getInternalState(jobMaster, "registeredTaskManagers");
+			final SlotPoolGateway slotPoolGateway = mock(SlotPoolGateway.class);
+			Whitebox.setInternalState(jobMaster, "slotPoolGateway", slotPoolGateway);
+
+			// before heartbeat timeout
+			assertTrue(heartbeatTargets.containsKey(tmResourceId));
+			assertTrue(registeredTMsInJM.containsKey(tmResourceId));
+
+			// continue to unmonitor heartbeat target
+			waitLatch.countDown();
+
+			// after heartbeat timeout
+			verify(slotPoolGateway, timeout(heartbeatTimeout * 5)).releaseTaskManager(eq(tmResourceId));
+			assertFalse(heartbeatTargets.containsKey(tmResourceId));
+			assertFalse(registeredTMsInJM.containsKey(tmResourceId));
+
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowError();
+
+		} finally {
+			rpc.stopService();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 076d126..5ffc97e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -103,6 +104,7 @@ public class TaskExecutorITCase {
 			rpcService.getScheduledExecutor(),
 			resourceManagerConfiguration.getJobTimeout());
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
+		HeartbeatManagerImpl heartbeatManager = mock(HeartbeatManagerImpl.class);
 
 		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
 		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234);
@@ -134,6 +136,7 @@ public class TaskExecutorITCase {
 			networkEnvironment,
 			testingHAServices,
 			metricRegistry,
+			heartbeatManager,
 			taskManagerMetricGroup,
 			broadcastVariableManager,
 			fileCache,

http://git-wip-us.apache.org/repos/asf/flink/blob/0b3d5c27/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index d413a01..f500246 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -39,6 +39,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatManagerImpl;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.NonHaServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -80,31 +82,131 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 import org.mockito.Matchers;
+import org.powermock.reflect.Whitebox;
 
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 public class TaskExecutorTest extends TestLogger {
 
 	@Rule
 	public TestName name = new TestName();
 
+	@Test
+	public void testHeartbeatTimeoutWithJobManager() throws Exception {
+		final JobID jobId = new JobID();
+		final Configuration configuration = new Configuration();
+		final TaskManagerConfiguration tmConfig = TaskManagerConfiguration.fromConfiguration(configuration);
+		final ResourceID tmResourceId = new ResourceID("tm");
+		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
+		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), mock(TimerService.class));
+
+		final TestingSerialRpcService rpc = new TestingSerialRpcService();
+		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService();
+		haServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService);
+		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		final CountDownLatch waitLatch =  new CountDownLatch(1);
+		final long heartbeatTimeout = 10L;
+		final HeartbeatManagerImpl<Void, Void> tmHeartbeatManager = new TestingHeartbeatManagerImpl<>(
+				waitLatch,
+				heartbeatTimeout,
+				tmResourceId,
+				rpc.getExecutor(),
+				rpc.getScheduledExecutor(),
+				log);
+
+		final String jobMasterAddress = "jm";
+		final UUID jmLeaderId = UUID.randomUUID();
+		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
+		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+		final int blobPort = 42;
+
+		when(jobMasterGateway.registerTaskManager(
+				any(String.class),
+				eq(taskManagerLocation),
+				eq(jmLeaderId),
+				any(Time.class)
+		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+		when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
+
+		try {
+			final TaskExecutor taskManager = new TaskExecutor(
+					tmConfig,
+					taskManagerLocation,
+					rpc,
+					mock(MemoryManager.class),
+					mock(IOManager.class),
+					mock(NetworkEnvironment.class),
+					haServices,
+					mock(MetricRegistry.class),
+					tmHeartbeatManager,
+					mock(TaskManagerMetricGroup.class),
+					mock(BroadcastVariableManager.class),
+					mock(FileCache.class),
+					taskSlotTable,
+					new JobManagerTable(),
+					jobLeaderService,
+					testingFatalErrorHandler);
+
+			taskManager.start();
+
+			rpc.registerGateway(jobMasterAddress, jobMasterGateway);
+
+			// we have to add the job after the TaskExecutor, because otherwise the service has not
+			// been properly started.
+			jobLeaderService.addJob(jobId, jobMasterAddress);
+
+			// now inform the task manager about the new job leader
+			jmLeaderRetrievalService.notifyListener(jobMasterAddress, jmLeaderId);
+
+			// register task manager success will trigger monitoring heartbeat target between tm and jm
+			verify(jobMasterGateway).registerTaskManager(
+					eq(taskManager.getAddress()), eq(taskManagerLocation), eq(jmLeaderId), any(Time.class));
+
+			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(tmHeartbeatManager, "heartbeatTargets");
+			final JobManagerTable jobManagerTable = Whitebox.getInternalState(taskManager, "jobManagerTable");
+			final Map<ResourceID, JobManagerConnection> jobManagerConnections = Whitebox.getInternalState(taskManager, "jobManagerConnections");
+
+			// before heartbeat timeout
+			assertTrue(heartbeatTargets.containsKey(jmResourceId));
+			assertTrue(jobManagerTable.contains(jobId));
+			assertTrue(jobManagerConnections.containsKey(jmResourceId));
+
+			// continue to unmonitor heartbeat target
+			waitLatch.countDown();
+
+			// after heartbeat timeout
+			verify(jobMasterGateway, timeout(heartbeatTimeout)).disconnectTaskManager(eq(tmResourceId));
+			assertFalse(heartbeatTargets.containsKey(jmResourceId));
+			assertFalse(jobManagerTable.contains(jobId));
+			assertFalse(jobManagerConnections.containsKey(jmResourceId));
+
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowError();
+
+		} finally {
+			rpc.stopService();
+		}
+	}
 
 	@Test
 	public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
@@ -146,6 +248,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -222,6 +325,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -310,6 +414,8 @@ public class TaskExecutorTest extends TestLogger {
 		when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
 
 		final JobManagerConnection jobManagerConnection = new JobManagerConnection(
+			jobId,
+			ResourceID.generate(),
 			mock(JobMasterGateway.class),
 			jobManagerLeaderId,
 			mock(TaskManagerActions.class),
@@ -351,6 +457,7 @@ public class TaskExecutorTest extends TestLogger {
 				networkEnvironment,
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				taskManagerMetricGroup,
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -457,6 +564,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -570,6 +678,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -644,6 +753,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
@@ -764,6 +874,8 @@ public class TaskExecutorTest extends TestLogger {
 		when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
 
 		final JobManagerConnection jobManagerConnection = new JobManagerConnection(
+			jobId,
+			jmResourceId,
 			jobMasterGateway,
 			jobManagerLeaderId,
 			mock(TaskManagerActions.class),
@@ -784,6 +896,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(HeartbeatManagerImpl.class),
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),


Mime
View raw message