flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/3] flink git commit: [FLINK-8942][runtime] Pass heartbeat target ResourceID
Date Tue, 20 Mar 2018 16:37:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master 4536e9cbe -> c90a757b2


[FLINK-8942][runtime] Pass heartbeat target ResourceID

received payload field now volatile

Add HeartbeatMonitor#getHeartbeatTargetId

This closes #5699.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9fbbc3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9fbbc3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9fbbc3a

Branch: refs/heads/master
Commit: f9fbbc3a137276cab4b8abf272199f1cd4633d29
Parents: 4536e9c
Author: zentol <chesnay@apache.org>
Authored: Wed Mar 14 14:21:27 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Mar 20 14:53:43 2018 +0100

----------------------------------------------------------------------
 .../runtime/heartbeat/HeartbeatListener.java    |   3 +-
 .../runtime/heartbeat/HeartbeatManagerImpl.java |   8 +-
 .../heartbeat/HeartbeatManagerSenderImpl.java   |   2 +-
 .../flink/runtime/jobmaster/JobMaster.java      |   4 +-
 .../resourcemanager/ResourceManager.java        |   4 +-
 .../runtime/taskexecutor/TaskExecutor.java      |   4 +-
 .../runtime/heartbeat/HeartbeatManagerTest.java | 167 ++++++++++++++++++-
 7 files changed, 177 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
index 734eb4c..01a4754 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
@@ -57,7 +57,8 @@ public interface HeartbeatListener<I, O> {
 	 * Retrieves the payload value for the next heartbeat message. Since the operation can happen
 	 * asynchronously, the result is returned wrapped in a future.
 	 *
+	 * @param resourceID Resource ID identifying the receiver of the payload
 	 * @return Future containing the next payload for heartbeats
 	 */
-	CompletableFuture<O> retrievePayload();
+	CompletableFuture<O> retrievePayload(ResourceID resourceID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 09c4b46..42268fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -106,7 +106,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I,
O> {
 		return heartbeatListener;
 	}
 
-	Collection<HeartbeatManagerImpl.HeartbeatMonitor<O>> getHeartbeatTargets() {
+	Collection<HeartbeatMonitor<O>> getHeartbeatTargets() {
 		return heartbeatTargets.values();
 	}
 
@@ -202,7 +202,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I,
O> {
 					heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
 				}
 
-				CompletableFuture<O> futurePayload = heartbeatListener.retrievePayload();
+				CompletableFuture<O> futurePayload = heartbeatListener.retrievePayload(requestOrigin);
 
 				if (futurePayload != null) {
 					CompletableFuture<Void> sendHeartbeatFuture = futurePayload.thenAcceptAsync(
@@ -289,6 +289,10 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I,
O> {
 			return heartbeatTarget;
 		}
 
+		ResourceID getHeartbeatTargetId() {
+			return resourceID;
+		}
+
 		public long getLastHeartbeat() {
 			return lastHeartbeat;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
index eb82343..e3b939c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
@@ -63,7 +63,7 @@ public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I,
O>
 		if (!stopped) {
 			log.debug("Trigger heartbeat request.");
 			for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets()) {
-				CompletableFuture<O> futurePayload = getHeartbeatListener().retrievePayload();
+				CompletableFuture<O> futurePayload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
 				final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
 
 				if (futurePayload != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index ced8c7c..f0b29bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1527,7 +1527,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId>
implements JobMast
 		}
 
 		@Override
-		public CompletableFuture<Void> retrievePayload() {
+		public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
 			return CompletableFuture.completedFuture(null);
 		}
 	}
@@ -1551,7 +1551,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId>
implements JobMast
 		}
 
 		@Override
-		public CompletableFuture<Void> retrievePayload() {
+		public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
 			return CompletableFuture.completedFuture(null);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 77e4362..0ae4ab6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -1076,7 +1076,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		}
 
 		@Override
-		public CompletableFuture<Void> retrievePayload() {
+		public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
 			return CompletableFuture.completedFuture(null);
 		}
 	}
@@ -1109,7 +1109,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		}
 
 		@Override
-		public CompletableFuture<Void> retrievePayload() {
+		public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
 			return CompletableFuture.completedFuture(null);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index fc69984..7409175 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1515,7 +1515,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 		}
 
 		@Override
-		public CompletableFuture<Void> retrievePayload() {
+		public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
 			return CompletableFuture.completedFuture(null);
 		}
 	}
@@ -1544,7 +1544,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 		}
 
 		@Override
-		public CompletableFuture<SlotReport> retrievePayload() {
+		public CompletableFuture<SlotReport> retrievePayload(ResourceID resourceID) {
 			return callAsync(
 					() -> taskSlotTable.createSlotReport(getResourceID()),
 					taskManagerConfiguration.getTimeout());

http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
index 390a131..77d12d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.heartbeat;
 
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -75,7 +76,7 @@ public class HeartbeatManagerTest extends TestLogger {
 
 		Object expectedObject = new Object();
 
-		when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject));
+		when(heartbeatListener.retrievePayload(any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(expectedObject));
 
 		HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
 			heartbeatTimeout,
@@ -93,7 +94,7 @@ public class HeartbeatManagerTest extends TestLogger {
 		heartbeatManager.requestHeartbeat(targetResourceID, expectedObject);
 
 		verify(heartbeatListener, times(1)).reportPayload(targetResourceID, expectedObject);
-		verify(heartbeatListener, times(1)).retrievePayload();
+		verify(heartbeatListener, times(1)).retrievePayload(any(ResourceID.class));
 		verify(heartbeatTarget, times(1)).receiveHeartbeat(ownResourceID, expectedObject);
 
 		heartbeatManager.receiveHeartbeat(targetResourceID, expectedObject);
@@ -118,7 +119,7 @@ public class HeartbeatManagerTest extends TestLogger {
 
 		Object expectedObject = new Object();
 
-		when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject));
+		when(heartbeatListener.retrievePayload(any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(expectedObject));
 
 		HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>(
 			heartbeatTimeout,
@@ -207,7 +208,7 @@ public class HeartbeatManagerTest extends TestLogger {
 		@SuppressWarnings("unchecked")
 		HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class);
 
-		when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(object));
+		when(heartbeatListener.retrievePayload(any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(object));
 
 		TestingHeartbeatListener heartbeatListener2 = new TestingHeartbeatListener(object2);
 
@@ -347,6 +348,162 @@ public class HeartbeatManagerTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the heartbeat target {@link ResourceID} is properly passed to the {@link HeartbeatListener}
by the
+	 * {@link HeartbeatManagerImpl}.
+	 */
+	@Test
+	public void testHeartbeatManagerTargetPayload() {
+		final long heartbeatTimeout = 100L;
+
+		final ResourceID someTargetId = ResourceID.generate();
+		final ResourceID specialTargetId = ResourceID.generate();
+		final TargetDependentHeartbeatReceiver someHeartbeatTarget = new TargetDependentHeartbeatReceiver();
+		final TargetDependentHeartbeatReceiver specialHeartbeatTarget = new TargetDependentHeartbeatReceiver();
+
+		final int defaultResponse = 0;
+		final int specialResponse = 1;
+
+		HeartbeatManager<?, Integer> heartbeatManager = new HeartbeatManagerImpl<>(
+			heartbeatTimeout,
+			ResourceID.generate(),
+			new TargetDependentHeartbeatSender(specialTargetId, specialResponse, defaultResponse),
+			Executors.directExecutor(),
+			mock(ScheduledExecutor.class),
+			LOG);
+
+		try {
+			heartbeatManager.monitorTarget(someTargetId, someHeartbeatTarget);
+			heartbeatManager.monitorTarget(specialTargetId, specialHeartbeatTarget);
+
+			heartbeatManager.requestHeartbeat(someTargetId, null);
+			assertEquals(defaultResponse, someHeartbeatTarget.getLastReceivedHeartbeatPayload());
+
+			heartbeatManager.requestHeartbeat(specialTargetId, null);
+			assertEquals(specialResponse, specialHeartbeatTarget.getLastReceivedHeartbeatPayload());
+		} finally {
+			heartbeatManager.stop();
+		}
+	}
+
+	/**
+	 * Tests that the heartbeat target {@link ResourceID} is properly passed to the {@link HeartbeatListener}
by the
+	 * {@link HeartbeatManagerSenderImpl}.
+	 */
+	@Test
+	public void testHeartbeatManagerSenderTargetPayload() throws Exception {
+		final long heartbeatTimeout = 100L;
+		final long heartbeatPeriod = 2000L;
+
+		final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
+
+		final ResourceID someTargetId = ResourceID.generate();
+		final ResourceID specialTargetId = ResourceID.generate();
+
+		final OneShotLatch someTargetReceivedLatch = new OneShotLatch();
+		final OneShotLatch specialTargetReceivedLatch = new OneShotLatch();
+
+		final TargetDependentHeartbeatReceiver someHeartbeatTarget = new TargetDependentHeartbeatReceiver(someTargetReceivedLatch);
+		final TargetDependentHeartbeatReceiver specialHeartbeatTarget = new TargetDependentHeartbeatReceiver(specialTargetReceivedLatch);
+
+		final int defaultResponse = 0;
+		final int specialResponse = 1;
+
+		HeartbeatManager<?, Integer> heartbeatManager = new HeartbeatManagerSenderImpl<>(
+			heartbeatPeriod,
+			heartbeatTimeout,
+			ResourceID.generate(),
+			new TargetDependentHeartbeatSender(specialTargetId, specialResponse, defaultResponse),
+			Executors.directExecutor(),
+			new ScheduledExecutorServiceAdapter(scheduledThreadPoolExecutor),
+			LOG);
+
+		try {
+			heartbeatManager.monitorTarget(someTargetId, someHeartbeatTarget);
+			heartbeatManager.monitorTarget(specialTargetId, specialHeartbeatTarget);
+
+			someTargetReceivedLatch.await(5, TimeUnit.SECONDS);
+			specialTargetReceivedLatch.await(5, TimeUnit.SECONDS);
+
+			assertEquals(defaultResponse, someHeartbeatTarget.getLastRequestedHeartbeatPayload());
+			assertEquals(specialResponse, specialHeartbeatTarget.getLastRequestedHeartbeatPayload());
+		} finally {
+			heartbeatManager.stop();
+			scheduledThreadPoolExecutor.shutdown();
+		}
+	}
+
+	/**
+	 * Test {@link HeartbeatTarget} that exposes the last received payload.
+	 */
+	private static class TargetDependentHeartbeatReceiver implements HeartbeatTarget<Integer>
{
+
+		private volatile int lastReceivedHeartbeatPayload = -1;
+		private volatile int lastRequestedHeartbeatPayload = -1;
+
+		private final OneShotLatch latch;
+
+		public TargetDependentHeartbeatReceiver() {
+			this(new OneShotLatch());
+		}
+
+		public TargetDependentHeartbeatReceiver(OneShotLatch latch) {
+			this.latch = latch;
+		}
+
+		@Override
+		public void receiveHeartbeat(ResourceID heartbeatOrigin, Integer heartbeatPayload) {
+			this.lastReceivedHeartbeatPayload = heartbeatPayload;
+			latch.trigger();
+		}
+
+		@Override
+		public void requestHeartbeat(ResourceID requestOrigin, Integer heartbeatPayload) {
+			this.lastRequestedHeartbeatPayload = heartbeatPayload;
+			latch.trigger();
+		}
+
+		public int getLastReceivedHeartbeatPayload() {
+			return lastReceivedHeartbeatPayload;
+		}
+
+		public int getLastRequestedHeartbeatPayload() {
+			return lastRequestedHeartbeatPayload;
+		}
+	}
+
+	/**
+	 * Test {@link HeartbeatListener} that returns different payloads based on the target {@link
ResourceID}.
+	 */
+	private static class TargetDependentHeartbeatSender implements HeartbeatListener<Object,
Integer>  {
+		private final ResourceID specialId;
+		private final int specialResponse;
+		private final int defaultResponse;
+
+		TargetDependentHeartbeatSender(ResourceID specialId, int specialResponse, int defaultResponse)
{
+			this.specialId = specialId;
+			this.specialResponse = specialResponse;
+			this.defaultResponse = defaultResponse;
+		}
+
+		@Override
+		public void notifyHeartbeatTimeout(ResourceID resourceID) {
+		}
+
+		@Override
+		public void reportPayload(ResourceID resourceID, Object payload) {
+		}
+
+		@Override
+		public CompletableFuture<Integer> retrievePayload(ResourceID resourceID) {
+			if (resourceID.equals(specialId)) {
+				return CompletableFuture.completedFuture(specialResponse);
+			} else {
+				return CompletableFuture.completedFuture(defaultResponse);
+			}
+		}
+	}
+
 	static class TestingHeartbeatListener implements HeartbeatListener<Object, Object>
{
 
 		private final CompletableFuture<ResourceID> future = new CompletableFuture<>();
@@ -378,7 +535,7 @@ public class HeartbeatManagerTest extends TestLogger {
 		}
 
 		@Override
-		public CompletableFuture<Object> retrievePayload() {
+		public CompletableFuture<Object> retrievePayload(ResourceID resourceID) {
 			return CompletableFuture.completedFuture(payload);
 		}
 	}


Mime
View raw message