Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E2AA020049B for ; Mon, 14 Aug 2017 12:32:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E12DC164D8A; Mon, 14 Aug 2017 10:32:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8C1E0164D87 for ; Mon, 14 Aug 2017 12:32:11 +0200 (CEST) Received: (qmail 29317 invoked by uid 500); 14 Aug 2017 10:32:10 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 29301 invoked by uid 99); 14 Aug 2017 10:32:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Aug 2017 10:32:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6F659E00C5; Mon, 14 Aug 2017 10:32:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Mon, 14 Aug 2017 10:32:11 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flink git commit: [FLINK-6180] [rpc] Remove TestingSerialRpcService archived-at: Mon, 14 Aug 2017 10:32:14 -0000 [FLINK-6180] [rpc] Remove TestingSerialRpcService The TestingSerialRpcService produces thread interleavings which are not happening when being executed with a proper RpcService implementation. Due to this the test cases can fail or succeed wrongly. In order to avoid this problem, this commit removes the TestingSerialRpcService and adapts all existing tests which used it before. Remove TestingSerialRpcService from MesosResourceManagerTest Remove TestingSerialRpcService from ResourceManagerJobMasterTest Remove TestingSerialRpcService from ResourceManagerTaskExecutorTest Remove TestingSerialRpcService from ResourceManagerTest Remove SerialTestingRpcService from JobMasterTest Remove TestingSerialRpcService from TaskExecutorITCase Remove TestingSerialRpcService from TaskExecutorTest Remove TestingSerialRpcService from SlotPoolTest Delete TestingSerialRpcService This closes #4516. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c84f8b9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c84f8b9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c84f8b9 Branch: refs/heads/master Commit: 3c84f8b9b9fa36d6fbb2d340015ed151a9771e1f Parents: 4c0fa6e Author: Till Rohrmann Authored: Thu Aug 10 14:07:09 2017 +0200 Committer: Till Rohrmann Committed: Mon Aug 14 12:23:17 2017 +0200 ---------------------------------------------------------------------- .../MesosResourceManagerTest.java | 13 +- .../apache/flink/runtime/instance/SlotPool.java | 4 +- .../flink/runtime/instance/SlotPoolGateway.java | 3 +- .../resourcemanager/ResourceManager.java | 2 - .../flink/runtime/rpc/akka/AkkaRpcActor.java | 3 + .../taskexecutor/slot/TaskSlotTable.java | 2 - .../clusterframework/ResourceManagerTest.java | 31 +- .../flink/runtime/instance/SlotPoolTest.java | 334 ++++++++------ .../flink/runtime/jobmaster/JobMasterTest.java | 27 +- .../resourcemanager/ResourceManagerHATest.java | 31 +- .../ResourceManagerJobMasterTest.java | 33 +- .../ResourceManagerTaskExecutorTest.java | 25 +- .../runtime/rpc/TestingSerialRpcService.java | 440 ------------------- .../taskexecutor/TaskExecutorITCase.java | 21 +- .../runtime/taskexecutor/TaskExecutorTest.java | 90 ++-- 15 files changed, 382 insertions(+), 677 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index f9e35a9..e81a2de 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -58,7 +58,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActio import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; @@ -205,14 +205,14 @@ public class MesosResourceManagerTest extends TestLogger { static class Context implements AutoCloseable { // services - TestingSerialRpcService rpcService; + TestingRpcService rpcService; TestingFatalErrorHandler fatalErrorHandler; MockMesosResourceManagerRuntimeServices rmServices; // RM ResourceManagerConfiguration rmConfiguration; ResourceID rmResourceID; - static final String RM_ADDRESS = "/resourceManager"; + static final String RM_ADDRESS = "resourceManager"; TestingMesosResourceManager resourceManager; // domain objects for test purposes @@ -239,7 +239,7 @@ public class MesosResourceManagerTest extends TestLogger { * Create mock RM dependencies. */ Context() throws Exception { - rpcService = new TestingSerialRpcService(); + rpcService = new TestingRpcService(); fatalErrorHandler = new TestingFatalErrorHandler(); rmServices = new MockMesosResourceManagerRuntimeServices(); @@ -300,6 +300,7 @@ public class MesosResourceManagerTest extends TestLogger { public final TestingLeaderElectionService rmLeaderElectionService; public final JobLeaderIdService jobLeaderIdService; public final SlotManager slotManager; + public final CompletableFuture slotManagerStarted; public ResourceManagerActions rmActions; public UUID rmLeaderSessionId; @@ -312,6 +313,7 @@ public class MesosResourceManagerTest extends TestLogger { heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor); metricRegistry = mock(MetricRegistry.class); slotManager = mock(SlotManager.class); + slotManagerStarted = new CompletableFuture<>(); jobLeaderIdService = new JobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), @@ -321,6 +323,7 @@ public class MesosResourceManagerTest extends TestLogger { @Override public Object answer(InvocationOnMock invocation) throws Throwable { rmActions = invocation.getArgumentAt(2, ResourceManagerActions.class); + slotManagerStarted.complete(true); return null; } }).when(slotManager).start(any(UUID.class), any(Executor.class), any(ResourceManagerActions.class)); @@ -426,6 +429,7 @@ public class MesosResourceManagerTest extends TestLogger { */ public MesosWorkerStore.Worker allocateWorker(Protos.TaskID taskID, ResourceProfile resourceProfile) throws Exception { when(rmServices.workerStore.newTaskID()).thenReturn(taskID); + rmServices.slotManagerStarted.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); rmServices.rmActions.allocateResource(resourceProfile); MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(taskID, resourceProfile); @@ -501,6 +505,7 @@ public class MesosResourceManagerTest extends TestLogger { // allocate a worker when(rmServices.workerStore.newTaskID()).thenReturn(task1).thenThrow(new AssertionFailedError()); + rmServices.slotManagerStarted.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); rmServices.rmActions.allocateResource(resourceProfile1); // verify that a new worker was persisted, the internal state was updated, the task router was notified, http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index de2b3e5..bf3de25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -613,7 +613,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { * @param resourceID The id of the TaskManager */ @Override - public void releaseTaskManager(final ResourceID resourceID) { + public CompletableFuture releaseTaskManager(final ResourceID resourceID) { if (registeredTaskManagers.remove(resourceID)) { availableSlots.removeAllForTaskManager(resourceID); @@ -622,6 +622,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { slot.releaseSlot(); } } + + return CompletableFuture.completedFuture(Acknowledge.get()); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java index 8d4f2a5..32a9af5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; @@ -74,7 +75,7 @@ public interface SlotPoolGateway extends RpcGateway { void registerTaskManager(ResourceID resourceID); - void releaseTaskManager(ResourceID resourceID); + CompletableFuture releaseTaskManager(ResourceID resourceID); CompletableFuture offerSlot(AllocatedSlot slot); http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index c2b0590..e8ec0e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -899,8 +899,6 @@ public abstract class ResourceManager clearState(); slotManager.suspend(); - - leaderSessionId = null; } }); } http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index f557447..d51607e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -189,6 +189,9 @@ class AkkaRpcActor extends UntypedActor { if (rpcMethod != null) { try { + // this supports declaration of anonymous classes + rpcMethod.setAccessible(true); + if (rpcMethod.getReturnType().equals(Void.TYPE)) { // No return value to send back rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index 5c51c7c..3634df0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -291,8 +291,6 @@ public class TaskSlotTable implements TimeoutListener { TaskSlot taskSlot = getTaskSlot(allocationId); if (taskSlot != null) { - LOG.info("Free slot {}.", allocationId, cause); - final JobID jobId = taskSlot.getJobId(); if (taskSlot.markFree()) { http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index 3ca0327..9ad251b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -48,9 +48,10 @@ import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; -import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; @@ -64,6 +65,8 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + import scala.Option; import java.util.ArrayList; @@ -93,6 +96,8 @@ public class ResourceManagerTest extends TestLogger { private static Configuration config = new Configuration(); + private final Time timeout = Time.seconds(10L); + private TestingHighAvailabilityServices highAvailabilityServices; private TestingLeaderRetrievalService jobManagerLeaderRetrievalService; @@ -479,7 +484,7 @@ public class ResourceManagerTest extends TestLogger { final ResourceID resourceManagerResourceID = ResourceID.generate(); final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - final TestingSerialRpcService rpcService = new TestingSerialRpcService(); + final TestingRpcService rpcService = new TestingRpcService(); rpcService.registerGateway(taskManagerAddress, taskExecutorGateway); final ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( @@ -519,18 +524,20 @@ public class ResourceManagerTest extends TestLogger { resourceManager.start(); + final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); + final UUID rmLeaderSessionId = UUID.randomUUID(); rmLeaderElectionService.isLeader(rmLeaderSessionId); final SlotReport slotReport = new SlotReport(); // test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time - CompletableFuture successfulFuture = resourceManager.registerTaskExecutor( + CompletableFuture successfulFuture = rmGateway.registerTaskExecutor( rmLeaderSessionId, taskManagerAddress, taskManagerResourceID, slotReport, - Time.milliseconds(0L)); - RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); + timeout); + RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); assertTrue(response instanceof TaskExecutorRegistrationSuccess); ArgumentCaptor heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); @@ -557,7 +564,7 @@ public class ResourceManagerTest extends TestLogger { // run the timeout runnable to simulate a heartbeat timeout timeoutRunnable.run(); - verify(taskExecutorGateway).disconnectResourceManager(any(TimeoutException.class)); + verify(taskExecutorGateway, Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(any(TimeoutException.class)); } finally { rpcService.stopService(); @@ -575,7 +582,7 @@ public class ResourceManagerTest extends TestLogger { final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); - final TestingSerialRpcService rpcService = new TestingSerialRpcService(); + final TestingRpcService rpcService = new TestingRpcService(); rpcService.registerGateway(jobMasterAddress, jobMasterGateway); final ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( @@ -620,17 +627,19 @@ public class ResourceManagerTest extends TestLogger { resourceManager.start(); + final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); + rmLeaderElectionService.isLeader(rmLeaderId); // test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time - CompletableFuture successfulFuture = resourceManager.registerJobManager( + CompletableFuture successfulFuture = rmGateway.registerJobManager( rmLeaderId, jmLeaderId, jmResourceId, jobMasterAddress, jobId, - Time.milliseconds(0L)); - RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); + timeout); + RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); assertTrue(response instanceof JobMasterRegistrationSuccess); ArgumentCaptor heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); @@ -657,7 +666,7 @@ public class ResourceManagerTest extends TestLogger { // run the timeout runnable to simulate a heartbeat timeout timeoutRunnable.run(); - verify(jobMasterGateway).disconnectResourceManager(eq(jmLeaderId), eq(rmLeaderId), any(TimeoutException.class)); + verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(eq(jmLeaderId), eq(rmLeaderId), any(TimeoutException.class)); } finally { rpcService.stopService(); http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index aeceb59..68c43f8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -28,15 +28,15 @@ import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import java.util.List; import java.util.UUID; @@ -51,218 +51,268 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class SlotPoolTest extends TestLogger { + private final Time timeout = Time.seconds(10L); + private RpcService rpcService; private JobID jobId; - private MainThreadValidatorUtil mainThreadValidatorUtil; - - private SlotPool slotPool; - - private ResourceManagerGateway resourceManagerGateway; - @Before public void setUp() throws Exception { - - this.rpcService = new TestingSerialRpcService(); + this.rpcService = new TestingRpcService(); this.jobId = new JobID(); - this.slotPool = new SlotPool(rpcService, jobId); - - this.mainThreadValidatorUtil = new MainThreadValidatorUtil(slotPool); - - mainThreadValidatorUtil.enterMainThread(); - - final String jobManagerAddress = "foobar"; - - slotPool.start(UUID.randomUUID(), jobManagerAddress); - - this.resourceManagerGateway = mock(ResourceManagerGateway.class); - when(resourceManagerGateway - .requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class))) - .thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); - - slotPool.connectToResourceManager(UUID.randomUUID(), resourceManagerGateway); } @After public void tearDown() throws Exception { - mainThreadValidatorUtil.exitMainThread(); + rpcService.stopService(); } @Test public void testAllocateSimpleSlot() throws Exception { - ResourceID resourceID = new ResourceID("resource"); - slotPool.registerTaskManager(resourceID); - - ScheduledUnit task = mock(ScheduledUnit.class); - CompletableFuture future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); - assertFalse(future.isDone()); - - ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); - verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); - - final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); - - AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertTrue(slotPool.offerSlot(allocatedSlot).get()); - - SimpleSlot slot = future.get(1, TimeUnit.SECONDS); - assertTrue(future.isDone()); - assertTrue(slot.isAlive()); - assertEquals(resourceID, slot.getTaskManagerID()); - assertEquals(jobId, slot.getJobID()); - assertEquals(slotPool.getSlotOwner(), slot.getOwner()); - assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocatedSlot().getSlotAllocationId()), slot); + ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock(); + final SlotPool slotPool = new SlotPool(rpcService, jobId); + + try { + SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); + ResourceID resourceID = new ResourceID("resource"); + slotPoolGateway.registerTaskManager(resourceID); + + ScheduledUnit task = mock(ScheduledUnit.class); + CompletableFuture future = slotPoolGateway.allocateSlot(task, DEFAULT_TESTING_PROFILE, null, timeout); + assertFalse(future.isDone()); + + ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); + verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + + final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); + + AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + + SimpleSlot slot = future.get(1, TimeUnit.SECONDS); + assertTrue(future.isDone()); + assertTrue(slot.isAlive()); + assertEquals(resourceID, slot.getTaskManagerID()); + assertEquals(jobId, slot.getJobID()); + assertEquals(slotPool.getSlotOwner(), slot.getOwner()); + assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocatedSlot().getSlotAllocationId()), slot); + } finally { + slotPool.shutDown(); + } } @Test public void testAllocationFulfilledByReturnedSlot() throws Exception { - ResourceID resourceID = new ResourceID("resource"); - slotPool.registerTaskManager(resourceID); + ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock(); + final SlotPool slotPool = new SlotPool(rpcService, jobId); + + try { + SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); + ResourceID resourceID = new ResourceID("resource"); + slotPool.registerTaskManager(resourceID); - CompletableFuture future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); - CompletableFuture future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); + CompletableFuture future1 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture future2 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); - assertFalse(future1.isDone()); - assertFalse(future2.isDone()); + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); - ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); - verify(resourceManagerGateway, times(2)) - .requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); + verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds()).times(2)) + .requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); - final List slotRequests = slotRequestArgumentCaptor.getAllValues(); + final List slotRequests = slotRequestArgumentCaptor.getAllValues(); - AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertTrue(slotPool.offerSlot(allocatedSlot).get()); + AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); - SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS); - assertTrue(future1.isDone()); - assertFalse(future2.isDone()); + SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS); + assertTrue(future1.isDone()); + assertFalse(future2.isDone()); - // return this slot to pool - slot1.releaseSlot(); + // return this slot to pool + slot1.releaseSlot(); - // second allocation fulfilled by previous slot returning - SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS); - assertTrue(future2.isDone()); + // second allocation fulfilled by previous slot returning + SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS); + assertTrue(future2.isDone()); - assertNotEquals(slot1, slot2); - assertTrue(slot1.isReleased()); - assertTrue(slot2.isAlive()); - assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID()); - assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber()); - assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocatedSlot().getSlotAllocationId()), slot2); + assertNotEquals(slot1, slot2); + assertTrue(slot1.isReleased()); + assertTrue(slot2.isAlive()); + assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID()); + assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber()); + assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocatedSlot().getSlotAllocationId()), slot2); + } finally { + slotPool.shutDown(); + } } @Test public void testAllocateWithFreeSlot() throws Exception { - ResourceID resourceID = new ResourceID("resource"); - slotPool.registerTaskManager(resourceID); + ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock(); + final SlotPool slotPool = new SlotPool(rpcService, jobId); - CompletableFuture future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); - assertFalse(future1.isDone()); + try { + SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); + ResourceID resourceID = new ResourceID("resource"); + slotPoolGateway.registerTaskManager(resourceID); - ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); - verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + CompletableFuture future1 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + assertFalse(future1.isDone()); - final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); + ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); + verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); - AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertTrue(slotPool.offerSlot(allocatedSlot).get()); + final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); - SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS); - assertTrue(future1.isDone()); + AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); - // return this slot to pool - slot1.releaseSlot(); + SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS); + assertTrue(future1.isDone()); - CompletableFuture future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); + // return this slot to pool + slot1.releaseSlot(); - // second allocation fulfilled by previous slot returning - SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS); - assertTrue(future2.isDone()); + CompletableFuture future2 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); - assertNotEquals(slot1, slot2); - assertTrue(slot1.isReleased()); - assertTrue(slot2.isAlive()); - assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID()); - assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber()); + // second allocation fulfilled by previous slot returning + SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS); + assertTrue(future2.isDone()); + + assertNotEquals(slot1, slot2); + assertTrue(slot1.isReleased()); + assertTrue(slot2.isAlive()); + assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID()); + assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber()); + } finally { + slotPool.shutDown(); + } } @Test public void testOfferSlot() throws Exception { - ResourceID resourceID = new ResourceID("resource"); - slotPool.registerTaskManager(resourceID); + ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock(); + final SlotPool slotPool = new SlotPool(rpcService, jobId); + + try { + SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); + ResourceID resourceID = new ResourceID("resource"); + slotPoolGateway.registerTaskManager(resourceID); - CompletableFuture future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); - assertFalse(future.isDone()); + CompletableFuture future = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + assertFalse(future.isDone()); - ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); - verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); + verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); - final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); + final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); - // slot from unregistered resource - AllocatedSlot invalid = createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertFalse(slotPool.offerSlot(invalid).get()); + // slot from unregistered resource + AllocatedSlot invalid = createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); + assertFalse(slotPoolGateway.offerSlot(invalid).get()); - AllocatedSlot notRequested = createAllocatedSlot(resourceID, new AllocationID(), jobId, DEFAULT_TESTING_PROFILE); + AllocatedSlot notRequested = createAllocatedSlot(resourceID, new AllocationID(), jobId, DEFAULT_TESTING_PROFILE); - // we'll also accept non requested slots - assertTrue(slotPool.offerSlot(notRequested).get()); + // we'll also accept non requested slots + assertTrue(slotPoolGateway.offerSlot(notRequested).get()); - AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); + AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - // accepted slot - assertTrue(slotPool.offerSlot(allocatedSlot).get()); - SimpleSlot slot = future.get(1, TimeUnit.SECONDS); - assertTrue(future.isDone()); - assertTrue(slot.isAlive()); + // accepted slot + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + SimpleSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + assertTrue(slot.isAlive()); - // duplicated offer with using slot - assertTrue(slotPool.offerSlot(allocatedSlot).get()); - assertTrue(future.isDone()); - assertTrue(slot.isAlive()); + // duplicated offer with using slot + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + assertTrue(slot.isAlive()); - // duplicated offer with free slot - slot.releaseSlot(); - assertTrue(slot.isReleased()); - assertTrue(slotPool.offerSlot(allocatedSlot).get()); + // duplicated offer with free slot + slot.releaseSlot(); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + } finally { + slotPool.shutDown(); + } } @Test public void testReleaseResource() throws Exception { - ResourceID resourceID = new ResourceID("resource"); - slotPool.registerTaskManager(resourceID); + ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock(); + + final CompletableFuture slotReturnFuture = new CompletableFuture<>(); + + final SlotPool slotPool = new SlotPool(rpcService, jobId) { + @Override + public void returnAllocatedSlot(Slot slot) { + super.returnAllocatedSlot(slot); + + slotReturnFuture.complete(true); + } + }; + + try { + SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); + ResourceID resourceID = new ResourceID("resource"); + slotPoolGateway.registerTaskManager(resourceID); + + CompletableFuture future1 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + + ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); + verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + + final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); + + CompletableFuture future2 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + + AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); - CompletableFuture future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); + SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS); + assertTrue(future1.isDone()); + assertFalse(future2.isDone()); - ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); - verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + slotPoolGateway.releaseTaskManager(resourceID); - final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); + // wait until the slot has been returned + slotReturnFuture.get(); - CompletableFuture future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L)); + assertTrue(slot1.isReleased()); - AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertTrue(slotPool.offerSlot(allocatedSlot).get()); + // slot released and not usable, second allocation still not fulfilled + Thread.sleep(10); + assertFalse(future2.isDone()); + } finally { + slotPool.shutDown(); + } + } + + private static ResourceManagerGateway createResourceManagerGatewayMock() { + ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); + when(resourceManagerGateway + .requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class))) + .thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); - SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS); - assertTrue(future1.isDone()); - assertFalse(future2.isDone()); + return resourceManagerGateway; + } - slotPool.releaseTaskManager(resourceID); - assertTrue(slot1.isReleased()); + private static SlotPoolGateway setupSlotPool( + SlotPool slotPool, + ResourceManagerGateway resourceManagerGateway) throws Exception { + final String jobManagerAddress = "foobar"; + + slotPool.start(UUID.randomUUID(), jobManagerAddress); + + slotPool.connectToResourceManager(UUID.randomUUID(), resourceManagerGateway); - // slot released and not usable, second allocation still not fulfilled - Thread.sleep(10); - assertFalse(future2.isDone()); + return slotPool.getSelfGateway(SlotPoolGateway.class); } static AllocatedSlot createAllocatedSlot( http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 0c4d376..df35369 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -33,8 +33,9 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; -import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.TestingFatalErrorHandler; @@ -62,6 +63,8 @@ import static org.mockito.Mockito.when; @PrepareForTest(BlobLibraryCacheManager.class) public class JobMasterTest extends TestLogger { + private final Time testingTimeout = Time.seconds(10L); + @Test public void testHeartbeatTimeoutWithTaskManager() throws Exception { final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); @@ -81,7 +84,7 @@ public class JobMasterTest extends TestLogger { final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234); final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - final TestingSerialRpcService rpc = new TestingSerialRpcService(); + final TestingRpcService rpc = new TestingRpcService(); rpc.registerGateway(taskManagerAddress, taskExecutorGateway); final long heartbeatInterval = 1L; @@ -89,6 +92,8 @@ public class JobMasterTest extends TestLogger { final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor); + final BlobLibraryCacheManager libraryCacheManager = mock(BlobLibraryCacheManager.class); + when(libraryCacheManager.getBlobServerPort()).thenReturn(1337); final JobGraph jobGraph = new JobGraph(); @@ -101,7 +106,7 @@ public class JobMasterTest extends TestLogger { haServices, heartbeatServices, Executors.newScheduledThreadPool(1), - mock(BlobLibraryCacheManager.class), + libraryCacheManager, mock(RestartStrategyFactory.class), Time.of(10, TimeUnit.SECONDS), null, @@ -111,8 +116,14 @@ public class JobMasterTest extends TestLogger { jobMaster.start(jmLeaderId); + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + // register task manager will trigger monitor heartbeat target, schedule heartbeat request at interval time - jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId, Time.milliseconds(0L)); + CompletableFuture registrationResponse = jobMasterGateway + .registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId, testingTimeout); + + // wait for the completion of the registration + registrationResponse.get(); ArgumentCaptor heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); verify(scheduledExecutor, times(1)).scheduleAtFixedRate( @@ -124,7 +135,7 @@ public class JobMasterTest extends TestLogger { Runnable heartbeatRunnable = heartbeatRunnableCaptor.getValue(); ArgumentCaptor timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); - verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS)); + verify(scheduledExecutor, timeout(testingTimeout.toMilliseconds())).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS)); Runnable timeoutRunnable = timeoutRunnableCaptor.getValue(); @@ -136,7 +147,7 @@ public class JobMasterTest extends TestLogger { // run the timeout runnable to simulate a heartbeat timeout timeoutRunnable.run(); - verify(taskExecutorGateway).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class)); + verify(taskExecutorGateway, timeout(testingTimeout.toMilliseconds())).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class)); // check if a concurrent error occurred testingFatalErrorHandler.rethrowError(); @@ -179,7 +190,7 @@ public class JobMasterTest extends TestLogger { )).thenReturn(CompletableFuture.completedFuture(new JobMasterRegistrationSuccess( heartbeatInterval, rmLeaderId, rmResourceId))); - final TestingSerialRpcService rpc = new TestingSerialRpcService(); + final TestingRpcService rpc = new TestingRpcService(); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); @@ -207,7 +218,7 @@ public class JobMasterTest extends TestLogger { rmLeaderRetrievalService.notifyListener(resourceManagerAddress, rmLeaderId); // register job manager success will trigger monitor heartbeat target between jm and rm - verify(resourceManagerGateway).registerJobManager( + verify(resourceManagerGateway, timeout(testingTimeout.toMilliseconds())).registerJobManager( eq(rmLeaderId), eq(jmLeaderId), eq(jmResourceId), http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 986f848..c213752 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; @@ -35,6 +35,7 @@ import org.junit.Assert; import org.junit.Test; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import static org.mockito.Mockito.mock; @@ -46,9 +47,17 @@ public class ResourceManagerHATest extends TestLogger { @Test public void testGrantAndRevokeLeadership() throws Exception { ResourceID rmResourceId = ResourceID.generate(); - RpcService rpcService = new TestingSerialRpcService(); + RpcService rpcService = new TestingRpcService(); + + CompletableFuture leaderSessionIdFuture = new CompletableFuture<>(); + + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService() { + @Override + public void confirmLeaderSessionID(UUID leaderId) { + leaderSessionIdFuture.complete(leaderId); + } + }; - TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); @@ -73,6 +82,8 @@ public class ResourceManagerHATest extends TestLogger { TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + CompletableFuture revokedLeaderIdFuture = new CompletableFuture<>(); + final ResourceManager resourceManager = new StandaloneResourceManager( rpcService, @@ -84,17 +95,25 @@ public class ResourceManagerHATest extends TestLogger { resourceManagerRuntimeServices.getSlotManager(), metricRegistry, resourceManagerRuntimeServices.getJobLeaderIdService(), - testingFatalErrorHandler); + testingFatalErrorHandler) { + + @Override + public void revokeLeadership() { + super.revokeLeadership(); + runAsync( + () -> revokedLeaderIdFuture.complete(getLeaderSessionId())); + } + }; resourceManager.start(); // before grant leadership, resourceManager's leaderId is null Assert.assertEquals(null, resourceManager.getLeaderSessionId()); final UUID leaderId = UUID.randomUUID(); leaderElectionService.isLeader(leaderId); // after grant leadership, resourceManager's leaderId has value - Assert.assertEquals(leaderId, resourceManager.getLeaderSessionId()); + Assert.assertEquals(leaderId, leaderSessionIdFuture.get()); // then revoke leadership, resourceManager's leaderId is null again leaderElectionService.notLeader(); - Assert.assertEquals(null, resourceManager.getLeaderSessionId()); + Assert.assertEquals(null, revokedLeaderIdFuture.get()); if (testingFatalErrorHandler.hasExceptionOccurred()) { testingFatalErrorHandler.rethrowError(); http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 10d6a72..139bfc4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; -import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; @@ -50,13 +50,13 @@ import static org.mockito.Mockito.*; public class ResourceManagerJobMasterTest extends TestLogger { - private TestingSerialRpcService rpcService; + private TestingRpcService rpcService; - private final Time timeout = Time.milliseconds(0L); + private final Time timeout = Time.seconds(10L); @Before public void setup() throws Exception { - rpcService = new TestingSerialRpcService(); + rpcService = new TestingRpcService(); } @After @@ -77,17 +77,18 @@ public class ResourceManagerJobMasterTest extends TestLogger { TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); + final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); // test response successful - CompletableFuture successfulFuture = resourceManager.registerJobManager( + CompletableFuture successfulFuture = rmGateway.registerJobManager( rmLeaderSessionId, jmLeaderID, jmResourceId, jobMasterAddress, jobID, timeout); - RegistrationResponse response = successfulFuture.get(5L, TimeUnit.SECONDS); + RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); assertTrue(response instanceof JobMasterRegistrationSuccess); if (testingFatalErrorHandler.hasExceptionOccurred()) { @@ -108,11 +109,12 @@ public class ResourceManagerJobMasterTest extends TestLogger { TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); + final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); // test throw exception when receive a registration from job master which takes unmatched leaderSessionId UUID differentLeaderSessionID = UUID.randomUUID(); - CompletableFuture unMatchedLeaderFuture = resourceManager.registerJobManager( + CompletableFuture unMatchedLeaderFuture = rmGateway.registerJobManager( differentLeaderSessionID, jmLeaderID, jmResourceId, @@ -139,13 +141,14 @@ public class ResourceManagerJobMasterTest extends TestLogger { HighAvailabilityServices.DEFAULT_LEADER_ID); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); + final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); final ResourceID jmResourceId = new ResourceID(jobMasterAddress); // test throw exception when receive a registration from job master which takes unmatched leaderSessionId UUID differentLeaderSessionID = UUID.randomUUID(); - CompletableFuture unMatchedLeaderFuture = resourceManager.registerJobManager( + CompletableFuture unMatchedLeaderFuture = rmGateway.registerJobManager( rmLeaderSessionId, differentLeaderSessionID, jmResourceId, @@ -172,13 +175,14 @@ public class ResourceManagerJobMasterTest extends TestLogger { HighAvailabilityServices.DEFAULT_LEADER_ID); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); + final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); final ResourceID jmResourceId = new ResourceID(jobMasterAddress); // test throw exception when receive a registration from job master which takes invalid address String invalidAddress = "/jobMasterAddress2"; - CompletableFuture invalidAddressFuture = resourceManager.registerJobManager( + CompletableFuture invalidAddressFuture = rmGateway.registerJobManager( rmLeaderSessionId, jmLeaderSessionId, jmResourceId, @@ -204,21 +208,26 @@ public class ResourceManagerJobMasterTest extends TestLogger { "localhost", HighAvailabilityServices.DEFAULT_LEADER_ID); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); - final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); + final ResourceManager resourceManager = createAndStartResourceManager( + resourceManagerLeaderElectionService, + jobID, + jobMasterLeaderRetrievalService, + testingFatalErrorHandler); + final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); final ResourceID jmResourceId = new ResourceID(jobMasterAddress); JobID unknownJobIDToHAServices = new JobID(); // verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener - CompletableFuture declineFuture = resourceManager.registerJobManager( + CompletableFuture declineFuture = rmGateway.registerJobManager( rmLeaderSessionId, jmLeaderSessionId, jmResourceId, jobMasterAddress, unknownJobIDToHAServices, timeout); - RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS); + RegistrationResponse response = declineFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); assertTrue(response instanceof RegistrationResponse.Decline); if (testingFatalErrorHandler.hasExceptionOccurred()) { http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index fc96f0d..616ed5c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; -import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; @@ -49,7 +49,9 @@ import static org.mockito.Mockito.mock; public class ResourceManagerTaskExecutorTest extends TestLogger { - private TestingSerialRpcService rpcService; + private final Time timeout = Time.seconds(10L); + + private TestingRpcService rpcService; private SlotReport slotReport = new SlotReport(); @@ -61,19 +63,22 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { private StandaloneResourceManager resourceManager; + private ResourceManagerGateway rmGateway; + private UUID leaderSessionId; private TestingFatalErrorHandler testingFatalErrorHandler; @Before public void setup() throws Exception { - rpcService = new TestingSerialRpcService(); + rpcService = new TestingRpcService(); taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); resourceManagerResourceID = ResourceID.generate(); TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); testingFatalErrorHandler = new TestingFatalErrorHandler(); resourceManager = createAndStartResourceManager(rmLeaderElectionService, testingFatalErrorHandler); + rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); leaderSessionId = grantLeadership(rmLeaderElectionService); } @@ -90,13 +95,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { try { // test response successful CompletableFuture successfulFuture = - resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L)); - RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); + rmGateway.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, timeout); + RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); assertTrue(response instanceof TaskExecutorRegistrationSuccess); // test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor CompletableFuture duplicateFuture = - resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L)); + rmGateway.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, timeout); RegistrationResponse duplicateResponse = duplicateFuture.get(); assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess); assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId()); @@ -116,8 +121,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { // test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId UUID differentLeaderSessionID = UUID.randomUUID(); CompletableFuture unMatchedLeaderFuture = - resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L)); - assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); + rmGateway.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport, timeout); + assertTrue(unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline); } finally { if (testingFatalErrorHandler.hasExceptionOccurred()) { testingFatalErrorHandler.rethrowError(); @@ -134,8 +139,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { // test throw exception when receive a registration from taskExecutor which takes invalid address String invalidAddress = "/taskExecutor2"; CompletableFuture invalidAddressFuture = - resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L)); - assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); + rmGateway.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport, timeout); + assertTrue(invalidAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline); } finally { if (testingFatalErrorHandler.hasExceptionOccurred()) { testingFatalErrorHandler.rethrowError(); http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java deleted file mode 100644 index cb38f6f..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ /dev/null @@ -1,440 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.concurrent.ScheduledExecutor; -import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; -import org.apache.flink.runtime.util.DirectExecutorService; -import org.apache.flink.util.Preconditions; - -import java.lang.annotation.Annotation; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * An RPC Service implementation for testing. This RPC service directly executes all asynchronous - * calls one by one in the calling thread. - */ -public class TestingSerialRpcService implements RpcService { - - private final DirectExecutorService executorService; - private final ScheduledExecutorService scheduledExecutorService; - private final ConcurrentHashMap registeredConnections; - private final CompletableFuture terminationFuture; - - private final ScheduledExecutor scheduledExecutorServiceAdapter; - - public TestingSerialRpcService() { - executorService = new DirectExecutorService(); - scheduledExecutorService = new ScheduledThreadPoolExecutor(1); - this.registeredConnections = new ConcurrentHashMap<>(16); - this.terminationFuture = new CompletableFuture<>(); - - this.scheduledExecutorServiceAdapter = new ScheduledExecutorServiceAdapter(scheduledExecutorService); - } - - @Override - public ScheduledFuture scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) { - try { - unit.sleep(delay); - runnable.run(); - - return new DoneScheduledFuture(null); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - - @Override - public CompletableFuture execute(Callable callable) { - try { - T result = callable.call(); - - return CompletableFuture.completedFuture(result); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); - } - } - - @Override - public Executor getExecutor() { - return executorService; - } - - public ScheduledExecutor getScheduledExecutor() { - return scheduledExecutorServiceAdapter; - } - - @Override - public void stopService() { - executorService.shutdown(); - - scheduledExecutorService.shutdown(); - - boolean terminated = false; - - try { - terminated = scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - if (!terminated) { - List runnables = scheduledExecutorService.shutdownNow(); - - for (Runnable runnable : runnables) { - runnable.run(); - } - } - - registeredConnections.clear(); - terminationFuture.complete(null); - } - - @Override - public CompletableFuture getTerminationFuture() { - return terminationFuture; - } - - @Override - public void stopServer(RpcServer selfGateway) { - registeredConnections.remove(selfGateway.getAddress()); - } - - @Override - public RpcServer startServer(S rpcEndpoint) { - final String address = UUID.randomUUID().toString(); - - InvocationHandler akkaInvocationHandler = new TestingSerialRpcService.TestingSerialInvocationHandler<>(address, rpcEndpoint); - ClassLoader classLoader = getClass().getClassLoader(); - - Set> implementedRpcGateways = RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()); - - implementedRpcGateways.add(RpcServer.class); - - - @SuppressWarnings("unchecked") - RpcServer rpcServer = (RpcServer) Proxy.newProxyInstance( - classLoader, - implementedRpcGateways.toArray(new Class[implementedRpcGateways.size()]), - akkaInvocationHandler); - - // register self - registeredConnections.putIfAbsent(rpcServer.getAddress(), rpcServer); - - return rpcServer; - } - - @Override - public String getAddress() { - return ""; - } - - @Override - public int getPort() { - return -1; - } - - @Override - public CompletableFuture connect(String address, Class clazz) { - RpcGateway gateway = registeredConnections.get(address); - - if (gateway != null) { - if (clazz.isAssignableFrom(gateway.getClass())) { - @SuppressWarnings("unchecked") - C typedGateway = (C) gateway; - return CompletableFuture.completedFuture(typedGateway); - } else { - return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz)); - } - } else { - return FutureUtils.completedExceptionally(new Exception("No gateway registered under " + address + '.')); - } - } - - // ------------------------------------------------------------------------ - // connections - // ------------------------------------------------------------------------ - - public void registerGateway(String address, RpcGateway gateway) { - checkNotNull(address); - checkNotNull(gateway); - - if (registeredConnections.putIfAbsent(address, gateway) != null) { - throw new IllegalStateException("a gateway is already registered under " + address); - } - } - - public void clearGateways() { - registeredConnections.clear(); - } - - private static final class TestingSerialInvocationHandler implements InvocationHandler, RpcGateway, MainThreadExecutable, StartStoppable { - - private final T rpcEndpoint; - - /** default timeout for asks */ - private final Time timeout; - - private final String address; - - private TestingSerialInvocationHandler(String address, T rpcEndpoint) { - this(address, rpcEndpoint, Time.seconds(10)); - } - - private TestingSerialInvocationHandler(String address, T rpcEndpoint, Time timeout) { - this.rpcEndpoint = rpcEndpoint; - this.timeout = timeout; - this.address = address; - } - - @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - Class declaringClass = method.getDeclaringClass(); - if (declaringClass.equals(MainThreadExecutable.class) || - declaringClass.equals(Object.class) || - declaringClass.equals(StartStoppable.class) || - declaringClass.equals(RpcServer.class) || - declaringClass.equals(RpcGateway.class)) { - return method.invoke(this, args); - } else { - final String methodName = method.getName(); - Class[] parameterTypes = method.getParameterTypes(); - Annotation[][] parameterAnnotations = method.getParameterAnnotations(); - Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); - - Class returnType = method.getReturnType(); - - if (returnType.equals(CompletableFuture.class)) { - try { - Object result = handleRpcInvocationSync(methodName, parameterTypes, args, futureTimeout); - return CompletableFuture.completedFuture(result); - } catch (Throwable e) { - return FutureUtils.completedExceptionally(e); - } - } else { - return handleRpcInvocationSync(methodName, parameterTypes, args, futureTimeout); - } - } - } - - /** - * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this - * method with the provided method arguments. If the method has a return value, it is returned - * to the sender of the call. - */ - private Object handleRpcInvocationSync(final String methodName, - final Class[] parameterTypes, - final Object[] args, - final Time futureTimeout) throws Exception { - final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes); - Object result = rpcMethod.invoke(rpcEndpoint, args); - - if (result instanceof Future) { - Future future = (Future) result; - return future.get(futureTimeout.getSize(), futureTimeout.getUnit()); - } else { - return result; - } - } - - @Override - public void runAsync(Runnable runnable) { - runnable.run(); - } - - @Override - public CompletableFuture callAsync(Callable callable, Time callTimeout) { - try { - return CompletableFuture.completedFuture(callable.call()); - } catch (Throwable e) { - return FutureUtils.completedExceptionally(e); - } - } - - @Override - public void scheduleRunAsync(final Runnable runnable, final long delay) { - try { - TimeUnit.MILLISECONDS.sleep(delay); - runnable.run(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - @Override - public String getAddress() { - return address; - } - - // this is not a real hostname but the address above is also not a real akka RPC address - // and we keep it that way until actually needed by a test case - @Override - public String getHostname() { - return address; - } - - @Override - public void start() { - // do nothing - } - - @Override - public void stop() { - // do nothing - } - - /** - * Look up the rpc method on the given {@link RpcEndpoint} instance. - * - * @param methodName Name of the method - * @param parameterTypes Parameter types of the method - * @return Method of the rpc endpoint - * @throws NoSuchMethodException Thrown if the method with the given name and parameter types - * cannot be found at the rpc endpoint - */ - private Method lookupRpcMethod(final String methodName, - final Class[] parameterTypes) throws NoSuchMethodException { - return rpcEndpoint.getClass().getMethod(methodName, parameterTypes); - } - - // ------------------------------------------------------------------------ - // Helper methods - // ------------------------------------------------------------------------ - - /** - * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method - * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default - * timeout is returned. - * - * @param parameterAnnotations Parameter annotations - * @param args Array of arguments - * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter - * has been found - * @return Timeout extracted from the array of arguments or the default timeout - */ - private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, - Time defaultTimeout) { - if (args != null) { - Preconditions.checkArgument(parameterAnnotations.length == args.length); - - for (int i = 0; i < parameterAnnotations.length; i++) { - if (isRpcTimeout(parameterAnnotations[i])) { - if (args[i] instanceof Time) { - return (Time) args[i]; - } else { - throw new RuntimeException("The rpc timeout parameter must be of type " + - Time.class.getName() + ". The type " + args[i].getClass().getName() + - " is not supported."); - } - } - } - } - - return defaultTimeout; - } - - /** - * Checks whether any of the annotations is of type {@link RpcTimeout} - * - * @param annotations Array of annotations - * @return True if {@link RpcTimeout} was found; otherwise false - */ - private static boolean isRpcTimeout(Annotation[] annotations) { - for (Annotation annotation : annotations) { - if (annotation.annotationType().equals(RpcTimeout.class)) { - return true; - } - } - - return false; - } - - } - - private static class DoneScheduledFuture implements ScheduledFuture { - - private final V value; - - private DoneScheduledFuture(V value) { - this.value = value; - } - - @Override - public long getDelay(TimeUnit unit) { - return 0L; - } - - @Override - public int compareTo(Delayed o) { - return 0; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return true; - } - - @Override - public V get() throws InterruptedException, ExecutionException { - return value; - } - - @Override - public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return value; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 4c87671..90f731d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -47,7 +48,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; -import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -57,6 +58,7 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; import org.hamcrest.Matchers; import org.junit.Test; +import org.mockito.Mockito; import java.net.InetAddress; import java.util.Arrays; @@ -76,6 +78,8 @@ import static org.mockito.Mockito.when; public class TaskExecutorITCase extends TestLogger { + private final Time timeout = Time.seconds(10L); + @Test public void testSlotAllocation() throws Exception { TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); @@ -98,7 +102,7 @@ public class TaskExecutorITCase extends TestLogger { testingHAServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); testingHAServices.setJobMasterLeaderRetriever(jobId, new TestingLeaderRetrievalService(jmAddress, jmLeaderId)); - TestingSerialRpcService rpcService = new TestingSerialRpcService(); + TestingRpcService rpcService = new TestingRpcService(); ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.milliseconds(500L), Time.milliseconds(500L)); @@ -170,6 +174,7 @@ public class TaskExecutorITCase extends TestLogger { rpcService.registerGateway(rmAddress, resourceManager.getSelfGateway(ResourceManagerGateway.class)); rpcService.registerGateway(jmAddress, jmGateway); + rpcService.registerGateway(taskExecutor.getAddress(), taskExecutor.getSelfGateway(TaskExecutorGateway.class)); final AllocationID allocationId = new AllocationID(); final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, jmAddress); @@ -179,27 +184,31 @@ public class TaskExecutorITCase extends TestLogger { resourceManager.start(); taskExecutor.start(); + final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); + // notify the RM that it is the leader rmLeaderElectionService.isLeader(rmLeaderId); // notify the TM about the new RM leader rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId); - CompletableFuture registrationResponseFuture = resourceManager.registerJobManager( + CompletableFuture registrationResponseFuture = rmGateway.registerJobManager( rmLeaderId, jmLeaderId, jmResourceId, jmAddress, jobId, - Time.milliseconds(0L)); + timeout); RegistrationResponse registrationResponse = registrationResponseFuture.get(); assertTrue(registrationResponse instanceof JobMasterRegistrationSuccess); - resourceManager.requestSlot(jmLeaderId, rmLeaderId, slotRequest, Time.milliseconds(0L)); + CompletableFuture slotAck = rmGateway.requestSlot(jmLeaderId, rmLeaderId, slotRequest, timeout); + + slotAck.get(); - verify(jmGateway).offerSlots( + verify(jmGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots( eq(taskManagerResourceId), (Iterable)argThat(Matchers.contains(slotOffer)), eq(jmLeaderId), any(Time.class));