flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7831] Make last received heartbeat retrievable
Date Fri, 20 Oct 2017 12:16:37 GMT
Repository: flink
Updated Branches:
  refs/heads/master 61c262958 -> 06b6c87a6


[FLINK-7831] Make last received heartbeat retrievable

This commit adds functionality to retrieve the last received heartbeat from
the HeartbeatManager.

This closes #4817.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06b6c87a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06b6c87a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06b6c87a

Branch: refs/heads/master
Commit: 06b6c87a63397ba0251c0b0aba72abbc4984557d
Parents: 61c2629
Author: Till <till.rohrmann@gmail.com>
Authored: Thu Oct 12 18:24:02 2017 +0200
Committer: Till <till.rohrmann@gmail.com>
Committed: Fri Oct 20 14:14:57 2017 +0200

----------------------------------------------------------------------
 .../runtime/heartbeat/HeartbeatManager.java     |  8 +++
 .../runtime/heartbeat/HeartbeatManagerImpl.java | 20 +++++++
 .../runtime/heartbeat/HeartbeatManagerTest.java | 59 ++++++++++++++++++++
 3 files changed, 87 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06b6c87a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
index 928c826..555ceca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
@@ -50,4 +50,12 @@ public interface HeartbeatManager<I, O> extends HeartbeatTarget<I>
{
 	 * Stops the heartbeat manager.
 	 */
 	void stop();
+
+	/**
+	 * Returns the last received heartbeat from the given target.
+	 *
+	 * @param resourceId for which to return the last heartbeat
+	 * @return Last heartbeat received from the given target or -1 if the target is not being
monitored.
+	 */
+	long getLastHeartbeatFrom(ResourceID resourceId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/06b6c87a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 99f44f9..09c4b46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -163,6 +163,17 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I,
O> {
 		heartbeatTargets.clear();
 	}
 
+	@Override
+	public long getLastHeartbeatFrom(ResourceID resourceId) {
+		HeartbeatMonitor<O> heartbeatMonitor = heartbeatTargets.get(resourceId);
+
+		if (heartbeatMonitor != null) {
+			return heartbeatMonitor.getLastHeartbeat();
+		} else {
+			return -1L;
+		}
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// HeartbeatTarget methods
 	//----------------------------------------------------------------------------------------------
@@ -252,6 +263,8 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I,
O> {
 
 		private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
 
+		private volatile long lastHeartbeat;
+
 		HeartbeatMonitor(
 			ResourceID resourceID,
 			HeartbeatTarget<O> heartbeatTarget,
@@ -267,6 +280,8 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I,
O> {
 			Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat timeout
interval has to be larger than 0.");
 			this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
 
+			lastHeartbeat = 0L;
+
 			resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
 		}
 
@@ -274,7 +289,12 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I,
O> {
 			return heartbeatTarget;
 		}
 
+		public long getLastHeartbeat() {
+			return lastHeartbeat;
+		}
+
 		void reportHeartbeat() {
+			lastHeartbeat = System.currentTimeMillis();
 			resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/06b6c87a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
index 593daf7..1727a59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.heartbeat;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.util.DirectExecutorService;
@@ -274,6 +275,64 @@ public class HeartbeatManagerTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the last heartbeat from an unregistered target equals -1.
+	 */
+	@Test
+	public void testLastHeartbeatFromUnregisteredTarget() {
+		final long heartbeatTimeout = 100L;
+		final ResourceID resourceId = ResourceID.generate();
+		final HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);
+
+		HeartbeatManager<?, ?> heartbeatManager = new HeartbeatManagerImpl<>(
+			heartbeatTimeout,
+			resourceId,
+			heartbeatListener,
+			Executors.directExecutor(),
+			mock(ScheduledExecutor.class),
+			LOG);
+
+		try {
+			assertEquals(-1L, heartbeatManager.getLastHeartbeatFrom(ResourceID.generate()));
+		} finally {
+			heartbeatManager.stop();
+		}
+	}
+
+	/**
+	 * Tests that we can correctly retrieve the last heartbeat for registered targets.
+	 */
+	@Test
+	public void testLastHeartbeatFrom() {
+		final long heartbeatTimeout = 100L;
+		final ResourceID resourceId = ResourceID.generate();
+		final HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);
+		final HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class);
+		final ResourceID target = ResourceID.generate();
+
+		HeartbeatManager<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
+			heartbeatTimeout,
+			resourceId,
+			heartbeatListener,
+			Executors.directExecutor(),
+			mock(ScheduledExecutor.class),
+			LOG);
+
+		try {
+			heartbeatManager.monitorTarget(target, heartbeatTarget);
+
+			assertEquals(0L, heartbeatManager.getLastHeartbeatFrom(target));
+
+			final long currentTime = System.currentTimeMillis();
+
+			heartbeatManager.receiveHeartbeat(target, null);
+
+			assertTrue(heartbeatManager.getLastHeartbeatFrom(target) >= currentTime);
+		} finally {
+			heartbeatManager.stop();
+		}
+	}
+
 	static class TestingHeartbeatListener implements HeartbeatListener<Object, Object>
{
 
 		private final CompletableFuture<ResourceID> future = new CompletableFuture<>();


Mime
View raw message