flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7093] [tm] Send SlotReport as part of the heartbeat payload to the ResourceManager
Date Wed, 05 Jul 2017 10:24:54 GMT
Repository: flink
Updated Branches:
  refs/heads/master 3fc96cd1f -> 1ba1260a8


[FLINK-7093] [tm] Send SlotReport as part of the heartbeat payload to the ResourceManager

The TaskManager sends the SlotReport as part of the heartbeat payload to the ResourceManager.
That way, the ResourceManager can sync its internal view on the slot allocation with the actual
allocation state.

This closes #4251.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ba1260a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ba1260a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ba1260a

Branch: refs/heads/master
Commit: 1ba1260a8f3c782dbbe51f060ccea66f10ecc3d3
Parents: 3fc96cd
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Jul 4 10:21:52 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Jul 5 12:23:28 2017 +0200

----------------------------------------------------------------------
 .../resourcemanager/ResourceManager.java        |  27 +++-
 .../resourcemanager/ResourceManagerGateway.java |   3 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  36 +++--
 .../flink/runtime/rpc/TestingRpcService.java    |   7 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 148 ++++++++++++++++++-
 5 files changed, 195 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ba1260a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index dac053a..d2e0222 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -111,7 +111,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	private final HighAvailabilityServices highAvailabilityServices;
 
 	/** The heartbeat manager with task managers. */
-	private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
+	private final HeartbeatManager<SlotReport, Void> taskManagerHeartbeatManager;
 
 	/** The heartbeat manager with job managers. */
 	private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
@@ -466,8 +466,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	@RpcMethod
-	public void heartbeatFromTaskManager(final ResourceID resourceID) {
-		taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
+	public void heartbeatFromTaskManager(final ResourceID resourceID, final SlotReport slotReport)
{
+		taskManagerHeartbeatManager.receiveHeartbeat(resourceID, slotReport);
 	}
 
 	@RpcMethod
@@ -956,7 +956,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		}
 	}
 
-	private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void>
{
+	private class TaskManagerHeartbeatListener implements HeartbeatListener<SlotReport, Void>
{
 
 		@Override
 		public void notifyHeartbeatTimeout(final ResourceID resourceID) {
@@ -973,8 +973,23 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		}
 
 		@Override
-		public void reportPayload(ResourceID resourceID, Void payload) {
-			// nothing to do since there is no payload
+		public void reportPayload(final ResourceID resourceID, final SlotReport slotReport) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.debug("Received new slot report from TaskManager {}.", resourceID);
+
+					final WorkerRegistration<WorkerType> workerRegistration = taskExecutors.get(resourceID);
+
+					if (workerRegistration == null) {
+						log.debug("Received slot report from TaskManager {} which is no longer registered.",
resourceID);
+					} else {
+						InstanceID instanceId = workerRegistration.getInstanceID();
+
+						slotManager.reportSlotStatus(instanceId, slotReport);
+					}
+				}
+			});
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1ba1260a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index fcbedcb..eb091c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -139,8 +139,9 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * Sends the heartbeat to resource manager from task manager
 	 *
 	 * @param heartbeatOrigin unique id of the task manager
+	 * @param slotReport Current slot allocation on the originating TaskManager
 	 */
-	void heartbeatFromTaskManager(final ResourceID heartbeatOrigin);
+	void heartbeatFromTaskManager(final ResourceID heartbeatOrigin, final SlotReport slotReport);
 
 	/**
 	 * Sends the heartbeat to resource manager from job manager

http://git-wip-us.apache.org/repos/asf/flink/blob/1ba1260a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index a919065..13153c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -99,6 +99,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -140,7 +141,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway>
{
 	private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
 	/** The heartbeat manager for resource manager in the task manager */
-	private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
+	private final HeartbeatManager<Void, SlotReport> resourceManagerHeartbeatManager;
 
 	/** The fatal error handler to use in case of a fatal error */
 	private final FatalErrorHandler fatalErrorHandler;
@@ -215,10 +216,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway>
{
 			log);
 
 		this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
-				getResourceID(),
-				new ResourceManagerHeartbeatListener(),
-				rpcService.getScheduledExecutor(),
-				log);
+			getResourceID(),
+			new ResourceManagerHeartbeatListener(),
+			rpcService.getScheduledExecutor(),
+			log);
 	}
 
 	// ------------------------------------------------------------------------
@@ -703,15 +704,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway>
{
 
 	private void establishResourceManagerConnection(ResourceID resourceManagerResourceId) {
 		// monitor the resource manager as heartbeat target
-		resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>()
{
+		resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<SlotReport>()
{
 			@Override
-			public void receiveHeartbeat(ResourceID resourceID, Void payload) {
+			public void receiveHeartbeat(ResourceID resourceID, SlotReport slotReport) {
 				ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
-				resourceManagerGateway.heartbeatFromTaskManager(resourceID);
+				resourceManagerGateway.heartbeatFromTaskManager(resourceID, slotReport);
 			}
 
 			@Override
-			public void requestHeartbeat(ResourceID resourceID, Void payload) {
+			public void requestHeartbeat(ResourceID resourceID, SlotReport slotReport) {
 				// the TaskManager won't send heartbeat requests to the ResourceManager
 			}
 		});
@@ -1137,6 +1138,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway>
{
 		return resourceManagerConnection;
 	}
 
+	@VisibleForTesting
+	HeartbeatManager<Void, SlotReport> getResourceManagerHeartbeatManager() {
+		return resourceManagerHeartbeatManager;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utility classes
 	// ------------------------------------------------------------------------
@@ -1321,7 +1327,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway>
{
 		}
 	}
 
-	private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void>
{
+	private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, SlotReport>
{
 
 		@Override
 		public void notifyHeartbeatTimeout(final ResourceID resourceId) {
@@ -1343,8 +1349,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway>
{
 		}
 
 		@Override
-		public Future<Void> retrievePayload() {
-			return FlinkCompletableFuture.completed(null);
+		public Future<SlotReport> retrievePayload() {
+			return callAsync(
+				new Callable<SlotReport>() {
+					@Override
+					public SlotReport call() throws Exception {
+						return taskSlotTable.createSlotReport(getResourceID());
+					}
+				}, taskManagerConfiguration.getTimeout());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1ba1260a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 47c9e24..b56bf6b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -25,14 +25,13 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 
-import java.net.UnknownHostException;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An RPC Service implementation for testing. This RPC service acts as a replacement for
- * teh regular RPC service for cases where tests need to return prepared mock gateways instead
of
+ * the regular RPC service for cases where tests need to return prepared mock gateways instead
of
  * proper RPC gateways.
  * 
  * <p>The TestingRpcService can be used for example in the following fashion,
@@ -58,14 +57,14 @@ public class TestingRpcService extends AkkaRpcService {
 	/**
 	 * Creates a new {@code TestingRpcService}. 
 	 */
-	public TestingRpcService() throws UnknownHostException {
+	public TestingRpcService() {
 		this(new Configuration());
 	}
 
 	/**
 	 * Creates a new {@code TestingRpcService}, using the given configuration. 
 	 */
-	public TestingRpcService(Configuration configuration) throws UnknownHostException {
+	public TestingRpcService(Configuration configuration) {
 		super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10));
 
 		this.registeredConnections = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/1ba1260a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index cc79c5d..b596f75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -41,8 +41,10 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
+import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -65,6 +67,7 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -81,6 +84,7 @@ import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -95,6 +99,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -259,13 +264,13 @@ public class TaskExecutorTest extends TestLogger {
 			any(HeartbeatListener.class),
 			any(ScheduledExecutor.class),
 			any(Logger.class))).thenAnswer(
-			new Answer<HeartbeatManagerImpl<Void, Void>>() {
+			new Answer<HeartbeatManagerImpl<SlotReport, Void>>() {
 				@Override
-				public HeartbeatManagerImpl<Void, Void> answer(InvocationOnMock invocation) throws
Throwable {
+				public HeartbeatManagerImpl<SlotReport, Void> answer(InvocationOnMock invocation)
throws Throwable {
 					return new HeartbeatManagerImpl<>(
 						heartbeatTimeout,
 						taskManagerLocation.getResourceID(),
-						(HeartbeatListener<Void, Void>)invocation.getArguments()[1],
+						(HeartbeatListener<SlotReport, Void>)invocation.getArguments()[1],
 						(Executor)invocation.getArguments()[2],
 						(ScheduledExecutor)invocation.getArguments()[2],
 						(Logger)invocation.getArguments()[3]);
@@ -312,6 +317,143 @@ public class TaskExecutorTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the correct slot report is sent as part of the heartbeat response.
+	 */
+	@Test
+	public void testHeartbeatSlotReporting() throws Exception {
+		final long timeout = 1000L;
+		final String rmAddress = "rm";
+		final String tmAddress = "tm";
+		final ResourceID rmResourceId = new ResourceID(rmAddress);
+		final ResourceID tmResourceId = new ResourceID(tmAddress);
+		final UUID rmLeaderId = UUID.randomUUID();
+
+		// register the mock resource manager gateway
+		ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
+		when(rmGateway.registerTaskExecutor(
+			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+			.thenReturn(
+				FlinkCompletableFuture.<RegistrationResponse>completed(
+					new TaskExecutorRegistrationSuccess(
+						new InstanceID(),
+						rmResourceId,
+						10L)));
+
+		final TestingRpcService rpc = new TestingRpcService();
+		rpc.registerGateway(rmAddress, rmGateway);
+
+		final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(
+			null,
+			null);
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		haServices.setResourceManagerLeaderRetriever(testLeaderService);
+
+		final TaskManagerConfiguration taskManagerConfiguration = mock(TaskManagerConfiguration.class);
+		when(taskManagerConfiguration.getNumberSlots()).thenReturn(1);
+		when(taskManagerConfiguration.getTimeout()).thenReturn(Time.seconds(10L));
+
+		final TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
+		when(taskManagerLocation.getResourceID()).thenReturn(tmResourceId);
+
+		final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+		final SlotID slotId = new SlotID(tmResourceId, 0);
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
+		final SlotReport slotReport1 = new SlotReport(
+			new SlotStatus(
+				slotId,
+				resourceProfile));
+		final SlotReport slotReport2 = new SlotReport(
+			new SlotStatus(
+				slotId,
+				resourceProfile,
+				new JobID(),
+				new AllocationID()));
+
+		when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport1, slotReport2);
+
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		final long heartbeatTimeout = 10000L;
+		final HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+
+		when(heartbeatServices.createHeartbeatManager(
+			eq(taskManagerLocation.getResourceID()),
+			any(HeartbeatListener.class),
+			any(ScheduledExecutor.class),
+			any(Logger.class))).thenAnswer(
+			new Answer<HeartbeatManagerImpl<SlotReport, Void>>() {
+				@Override
+				public HeartbeatManagerImpl<SlotReport, Void> answer(InvocationOnMock invocation)
throws Throwable {
+					return spy(new HeartbeatManagerImpl<>(
+						heartbeatTimeout,
+						taskManagerLocation.getResourceID(),
+						(HeartbeatListener<SlotReport, Void>)invocation.getArguments()[1],
+						(Executor)invocation.getArguments()[2],
+						(ScheduledExecutor)invocation.getArguments()[2],
+						(Logger)invocation.getArguments()[3]));
+				}
+			}
+		);
+
+		try {
+			final TaskExecutor taskManager = new TaskExecutor(
+				rpc,
+				taskManagerConfiguration,
+				taskManagerLocation,
+				mock(MemoryManager.class),
+				mock(IOManager.class),
+				mock(NetworkEnvironment.class),
+				haServices,
+				heartbeatServices,
+				mock(MetricRegistry.class),
+				mock(TaskManagerMetricGroup.class),
+				mock(BroadcastVariableManager.class),
+				mock(FileCache.class),
+				taskSlotTable,
+				mock(JobManagerTable.class),
+				mock(JobLeaderService.class),
+				testingFatalErrorHandler);
+
+			taskManager.start();
+
+			// wait for spied heartbeat manager instance
+			HeartbeatManager<Void, SlotReport> heartbeatManager = taskManager.getResourceManagerHeartbeatManager();
+
+			// define a leader and see that a registration happens
+			testLeaderService.notifyListener(rmAddress, rmLeaderId);
+
+			// register resource manager success will trigger monitoring heartbeat target between
tm and rm
+			verify(rmGateway, timeout(timeout).atLeast(1)).registerTaskExecutor(
+				eq(rmLeaderId), eq(taskManager.getAddress()), eq(tmResourceId), eq(slotReport1), any(Time.class));
+
+			verify(heartbeatManager, timeout(timeout)).monitorTarget(any(ResourceID.class), any(HeartbeatTarget.class));
+
+			TaskExecutorGateway taskExecutorGateway = taskManager.getSelf();
+
+			// trigger the heartbeat asynchronously
+			taskExecutorGateway.heartbeatFromResourceManager(rmResourceId);
+
+			ArgumentCaptor<SlotReport> slotReportArgumentCaptor = ArgumentCaptor.forClass(SlotReport.class);
+
+			// wait for heartbeat response
+			verify(rmGateway, timeout(timeout)).heartbeatFromTaskManager(
+				eq(taskManagerLocation.getResourceID()),
+				slotReportArgumentCaptor.capture());
+
+			SlotReport actualSlotReport = slotReportArgumentCaptor.getValue();
+
+			// the new slot report should be reported
+			assertEquals(slotReport2, actualSlotReport);
+
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowError();
+
+		} finally {
+			rpc.stopService();
+		}
+	}
+
 	@Test
 	public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
 		final ResourceID resourceID = ResourceID.generate();


Mime
View raw message