Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B5105200BB1 for ; Wed, 28 Sep 2016 10:20:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B3BFC160AD4; Wed, 28 Sep 2016 08:20:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BA196160AFC for ; Wed, 28 Sep 2016 10:20:46 +0200 (CEST) Received: (qmail 93421 invoked by uid 500); 28 Sep 2016 08:20:45 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 91823 invoked by uid 99); 28 Sep 2016 08:20:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Sep 2016 08:20:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B968EEF79F; Wed, 28 Sep 2016 08:20:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Wed, 28 Sep 2016 08:21:07 -0000 Message-Id: <16936c9b5d834ddb8ee570c2f3581b69@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [24/50] [abbrv] flink git commit: [FLINK-4529] [flip-6] Move TaskExecutor, JobMaster and ResourceManager out of the rpc package archived-at: Wed, 28 Sep 2016 08:20:51 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java new file mode 100644 index 0000000..52d9d06 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.ResourceSlot; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.junit.Before; +import org.junit.Test; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class SlotManagerTest { + + private static final double DEFAULT_TESTING_CPU_CORES = 1.0; + + private static final long DEFAULT_TESTING_MEMORY = 512; + + private static final ResourceProfile DEFAULT_TESTING_PROFILE = + new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY); + + private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = + new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2); + + private ResourceManagerGateway resourceManagerGateway; + + @Before + public void setUp() { + resourceManagerGateway = mock(ResourceManagerGateway.class); + } + + /** + * Tests that there are no free slots when we request, need to allocate from cluster manager master + */ + @Test + public void testRequestSlotWithoutFreeSlot() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + + assertEquals(0, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(1, slotManager.getPendingRequestCount()); + assertEquals(1, slotManager.getAllocatedContainers().size()); + assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0)); + } + + /** + * Tests that there are some free slots when we request, and the request is fulfilled immediately + */ + @Test + public void testRequestSlotWithFreeSlot() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + + directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1); + assertEquals(1, slotManager.getFreeSlotCount()); + + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertEquals(0, slotManager.getAllocatedContainers().size()); + } + + /** + * Tests that there are some free slots when we request, but none of them are suitable + */ + @Test + public void testRequestSlotWithoutSuitableSlot() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + + directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2); + assertEquals(2, slotManager.getFreeSlotCount()); + + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE)); + assertEquals(0, slotManager.getAllocatedSlotCount()); + assertEquals(2, slotManager.getFreeSlotCount()); + assertEquals(1, slotManager.getPendingRequestCount()); + assertEquals(1, slotManager.getAllocatedContainers().size()); + assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0)); + } + + /** + * Tests that we send duplicated slot request + */ + @Test + public void testDuplicatedSlotRequest() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1); + + SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE); + + slotManager.requestSlot(request1); + slotManager.requestSlot(request2); + slotManager.requestSlot(request2); + slotManager.requestSlot(request1); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(1, slotManager.getPendingRequestCount()); + assertEquals(1, slotManager.getAllocatedContainers().size()); + assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0)); + } + + /** + * Tests that we send multiple slot requests + */ + @Test + public void testRequestMultipleSlots() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5); + + // request 3 normal slots + for (int i = 0; i < 3; ++i) { + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + } + + // request 2 big slots + for (int i = 0; i < 2; ++i) { + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE)); + } + + // request 1 normal slot again + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + + assertEquals(4, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + assertEquals(2, slotManager.getPendingRequestCount()); + assertEquals(2, slotManager.getAllocatedContainers().size()); + assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0)); + assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1)); + } + + /** + * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request + */ + @Test + public void testNewlyAppearedFreeSlotFulfillPendingRequest() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + assertEquals(1, slotManager.getPendingRequestCount()); + + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slotId)); + } + + /** + * Tests that a new slot appeared in SlotReport, but we have no pending request + */ + @Test + public void testNewlyAppearedFreeSlot() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(0, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + } + + /** + * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests + */ + @Test + public void testNewlyAppearedFreeSlotNotMatchPendingRequests() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE)); + assertEquals(1, slotManager.getPendingRequestCount()); + + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(0, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + assertEquals(1, slotManager.getPendingRequestCount()); + assertFalse(slotManager.isAllocated(slotId)); + } + + /** + * Tests that a new slot appeared in SlotReport, and it's been reported using by some job + */ + @Test + public void testNewlyAppearedInUseSlot() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID()); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertTrue(slotManager.isAllocated(slotId)); + } + + /** + * Tests that we had a slot in-use, and it's confirmed by SlotReport + */ + @Test + public void testExistingInUseSlotUpdateStatus() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request); + + // make this slot in use + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertTrue(slotManager.isAllocated(slotId)); + + // slot status is confirmed + SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, + request.getAllocationId(), request.getJobId()); + slotManager.updateSlotStatus(slotStatus2); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertTrue(slotManager.isAllocated(slotId)); + } + + /** + * Tests that we had a slot in-use, but it's empty according to the SlotReport + */ + @Test + public void testExistingInUseSlotAdjustedToEmpty() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request1); + + // make this slot in use + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + // another request pending + SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request2); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(1, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slotId)); + assertTrue(slotManager.isAllocated(request1.getAllocationId())); + + + // but slot is reported empty again, request2 will be fulfilled, request1 will be missing + slotManager.updateSlotStatus(slotStatus); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slotId)); + assertTrue(slotManager.isAllocated(request2.getAllocationId())); + } + + /** + * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation + * information didn't match. + */ + @Test + public void testExistingInUseSlotWithDifferentAllocationInfo() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request); + + // make this slot in use + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slotId)); + assertTrue(slotManager.isAllocated(request.getAllocationId())); + + SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID()); + // update slot status with different allocation info + slotManager.updateSlotStatus(slotStatus2); + + // original request is missing and won't be allocated + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slotId)); + assertFalse(slotManager.isAllocated(request.getAllocationId())); + assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID())); + } + + /** + * Tests that we had a free slot, and it's confirmed by SlotReport + */ + @Test + public void testExistingEmptySlotUpdateStatus() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE); + slotManager.addFreeSlot(slot); + + SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(0, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + } + + /** + * Tests that we had a free slot, and it's reported in-use by TaskManager + */ + @Test + public void testExistingEmptySlotAdjustedToInUse() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE); + slotManager.addFreeSlot(slot); + + SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE, + new AllocationID(), new JobID()); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slot.getSlotId())); + } + + /** + * Tests that we did some allocation but failed / rejected by TaskManager, request will retry + */ + @Test + public void testSlotAllocationFailedAtTaskManager() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE); + slotManager.addFreeSlot(slot); + + SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slot.getSlotId())); + + slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId()); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + } + + + /** + * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request + */ + @Test + public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE); + slotManager.addFreeSlot(slot); + + SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request); + + // slot is set empty by heartbeat + SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile()); + slotManager.updateSlotStatus(slotStatus); + + // another request took this slot + SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request2); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertFalse(slotManager.isAllocated(request.getAllocationId())); + assertTrue(slotManager.isAllocated(request2.getAllocationId())); + + // original request should be pended + slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId()); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(1, slotManager.getPendingRequestCount()); + assertFalse(slotManager.isAllocated(request.getAllocationId())); + assertTrue(slotManager.isAllocated(request2.getAllocationId())); + } + + @Test + public void testNotifyTaskManagerFailure() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + + ResourceID resource1 = ResourceID.generate(); + ResourceID resource2 = ResourceID.generate(); + + ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE); + ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE); + ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE); + ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE); + + slotManager.addFreeSlot(slot11); + slotManager.addFreeSlot(slot21); + + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + + assertEquals(2, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + + slotManager.addFreeSlot(slot12); + slotManager.addFreeSlot(slot22); + + assertEquals(2, slotManager.getAllocatedSlotCount()); + assertEquals(2, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + + slotManager.notifyTaskManagerFailure(resource2); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + + // notify an not exist resource failure + slotManager.notifyTaskManagerFailure(ResourceID.generate()); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + } + + // ------------------------------------------------------------------------ + // testing utilities + // ------------------------------------------------------------------------ + + private void directlyProvideFreeSlots( + final SlotManager slotManager, + final ResourceProfile resourceProfile, + final int freeSlotNum) + { + for (int i = 0; i < freeSlotNum; ++i) { + slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile))); + } + } + + // ------------------------------------------------------------------------ + // testing classes + // ------------------------------------------------------------------------ + + private static class TestingSlotManager extends SlotManager { + + private final List allocatedContainers; + + TestingSlotManager(ResourceManagerGateway resourceManagerGateway) { + super(resourceManagerGateway); + this.allocatedContainers = new LinkedList<>(); + } + + /** + * Choose slot randomly if it matches requirement + * + * @param request The slot request + * @param freeSlots All slots which can be used + * @return The chosen slot or null if cannot find a match + */ + @Override + protected ResourceSlot chooseSlotToUse(SlotRequest request, Map freeSlots) { + for (ResourceSlot slot : freeSlots.values()) { + if (slot.isMatchingRequirement(request.getResourceProfile())) { + return slot; + } + } + return null; + } + + /** + * Choose request randomly if offered slot can match its requirement + * + * @param offeredSlot The free slot + * @param pendingRequests All the pending slot requests + * @return The chosen request's AllocationID or null if cannot find a match + */ + @Override + protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, + Map pendingRequests) + { + for (Map.Entry pendingRequest : pendingRequests.entrySet()) { + if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) { + return pendingRequest.getValue(); + } + } + return null; + } + + @Override + protected void allocateContainer(ResourceProfile resourceProfile) { + allocatedContainers.add(resourceProfile); + } + + List getAllocatedContainers() { + return allocatedContainers; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index 2790cf8..f55069e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -21,28 +21,14 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; import akka.util.Timeout; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.NonHaServices; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.rpc.jobmaster.JobMaster; -import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; -import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Test; -import org.mockito.Mockito; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class AkkaRpcServiceTest extends TestLogger { http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java deleted file mode 100644 index 9508825..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.registration; - -import akka.dispatch.Futures; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.TestingRpcService; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import org.slf4j.LoggerFactory; - -import scala.concurrent.Await; -import scala.concurrent.ExecutionContext$; -import scala.concurrent.Future; -import scala.concurrent.Promise; -import scala.concurrent.duration.FiniteDuration; - -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeoutException; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -/** - * Tests for the generic retrying registration class, validating the failure, retry, and back-off behavior. - */ -public class RetryingRegistrationTest extends TestLogger { - - @Test - public void testSimpleSuccessfulRegistration() throws Exception { - final String testId = "laissez les bon temps roulez"; - final String testEndpointAddress = ""; - final UUID leaderId = UUID.randomUUID(); - - // an endpoint that immediately returns success - TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId)); - TestingRpcService rpc = new TestingRpcService(); - - try { - rpc.registerGateway(testEndpointAddress, testGateway); - - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); - registration.startRegistration(); - - Future> future = registration.getFuture(); - assertNotNull(future); - - // multiple accesses return the same future - assertEquals(future, registration.getFuture()); - - Tuple2 success = - Await.result(future, new FiniteDuration(10, SECONDS)); - - // validate correct invocation and result - assertEquals(testId, success.f1.getCorrelationId()); - assertEquals(leaderId, testGateway.getInvocations().take().leaderId()); - } - finally { - testGateway.stop(); - rpc.stopService(); - } - } - - @Test - public void testPropagateFailures() throws Exception { - final String testExceptionMessage = "testExceptionMessage"; - - // RPC service that fails with exception upon the connection - RpcService rpc = mock(RpcService.class); - when(rpc.connect(anyString(), any(Class.class))).thenThrow(new RuntimeException(testExceptionMessage)); - - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID()); - registration.startRegistration(); - - Future future = registration.getFuture(); - assertTrue(future.failed().isCompleted()); - - assertEquals(testExceptionMessage, future.failed().value().get().get().getMessage()); - } - - @Test - public void testRetryConnectOnFailure() throws Exception { - final String testId = "laissez les bon temps roulez"; - final UUID leaderId = UUID.randomUUID(); - - ExecutorService executor = Executors.newCachedThreadPool(); - TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId)); - - try { - // RPC service that fails upon the first connection, but succeeds on the second - RpcService rpc = mock(RpcService.class); - when(rpc.connect(anyString(), any(Class.class))).thenReturn( - Futures.failed(new Exception("test connect failure")), // first connection attempt fails - Futures.successful(testGateway) // second connection attempt succeeds - ); - when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor)); - - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId); - registration.startRegistration(); - - Tuple2 success = - Await.result(registration.getFuture(), new FiniteDuration(10, SECONDS)); - - // validate correct invocation and result - assertEquals(testId, success.f1.getCorrelationId()); - assertEquals(leaderId, testGateway.getInvocations().take().leaderId()); - } - finally { - testGateway.stop(); - executor.shutdown(); - } - } - - @Test - public void testRetriesOnTimeouts() throws Exception { - final String testId = "rien ne va plus"; - final String testEndpointAddress = ""; - final UUID leaderId = UUID.randomUUID(); - - // an endpoint that immediately returns futures with timeouts before returning a successful future - TestRegistrationGateway testGateway = new TestRegistrationGateway( - null, // timeout - null, // timeout - new TestRegistrationSuccess(testId) // success - ); - - TestingRpcService rpc = new TestingRpcService(); - - try { - rpc.registerGateway(testEndpointAddress, testGateway); - - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); - - long started = System.nanoTime(); - registration.startRegistration(); - - Future> future = registration.getFuture(); - Tuple2 success = - Await.result(future, new FiniteDuration(10, SECONDS)); - - long finished = System.nanoTime(); - long elapsedMillis = (finished - started) / 1000000; - - // validate correct invocation and result - assertEquals(testId, success.f1.getCorrelationId()); - assertEquals(leaderId, testGateway.getInvocations().take().leaderId()); - - // validate that some retry-delay / back-off behavior happened - assertTrue("retries did not properly back off", elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT); - } - finally { - rpc.stopService(); - testGateway.stop(); - } - } - - @Test - public void testDecline() throws Exception { - final String testId = "qui a coupe le fromage"; - final String testEndpointAddress = ""; - final UUID leaderId = UUID.randomUUID(); - - TestingRpcService rpc = new TestingRpcService(); - - TestRegistrationGateway testGateway = new TestRegistrationGateway( - null, // timeout - new RegistrationResponse.Decline("no reason "), - null, // timeout - new TestRegistrationSuccess(testId) // success - ); - - try { - rpc.registerGateway(testEndpointAddress, testGateway); - - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); - - long started = System.nanoTime(); - registration.startRegistration(); - - Future> future = registration.getFuture(); - Tuple2 success = - Await.result(future, new FiniteDuration(10, SECONDS)); - - long finished = System.nanoTime(); - long elapsedMillis = (finished - started) / 1000000; - - // validate correct invocation and result - assertEquals(testId, success.f1.getCorrelationId()); - assertEquals(leaderId, testGateway.getInvocations().take().leaderId()); - - // validate that some retry-delay / back-off behavior happened - assertTrue("retries did not properly back off", elapsedMillis >= - 2 * TestRetryingRegistration.INITIAL_TIMEOUT + TestRetryingRegistration.DELAY_ON_DECLINE); - } - finally { - testGateway.stop(); - rpc.stopService(); - } - } - - @Test - @SuppressWarnings("unchecked") - public void testRetryOnError() throws Exception { - final String testId = "Petit a petit, l'oiseau fait son nid"; - final String testEndpointAddress = ""; - final UUID leaderId = UUID.randomUUID(); - - TestingRpcService rpc = new TestingRpcService(); - - try { - // gateway that upon calls first responds with a failure, then with a success - TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); - - when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn( - Futures.failed(new Exception("test exception")), - Futures.successful(new TestRegistrationSuccess(testId))); - - rpc.registerGateway(testEndpointAddress, testGateway); - - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); - - long started = System.nanoTime(); - registration.startRegistration(); - - Future> future = registration.getFuture(); - Tuple2 success = - Await.result(future, new FiniteDuration(10, SECONDS)); - - long finished = System.nanoTime(); - long elapsedMillis = (finished - started) / 1000000; - - assertEquals(testId, success.f1.getCorrelationId()); - - // validate that some retry-delay / back-off behavior happened - assertTrue("retries did not properly back off", - elapsedMillis >= TestRetryingRegistration.DELAY_ON_ERROR); - } - finally { - rpc.stopService(); - } - } - - @Test - public void testCancellation() throws Exception { - final String testEndpointAddress = "my-test-address"; - final UUID leaderId = UUID.randomUUID(); - - TestingRpcService rpc = new TestingRpcService(); - - try { - Promise result = Futures.promise(); - - TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); - when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result.future()); - - rpc.registerGateway(testEndpointAddress, testGateway); - - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); - registration.startRegistration(); - - // cancel and fail the current registration attempt - registration.cancel(); - result.failure(new TimeoutException()); - - // there should not be a second registration attempt - verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong()); - } - finally { - rpc.stopService(); - } - } - - // ------------------------------------------------------------------------ - // test registration - // ------------------------------------------------------------------------ - - private static class TestRegistrationSuccess extends RegistrationResponse.Success { - private static final long serialVersionUID = 5542698790917150604L; - - private final String correlationId; - - private TestRegistrationSuccess(String correlationId) { - this.correlationId = correlationId; - } - - public String getCorrelationId() { - return correlationId; - } - } - - private static class TestRetryingRegistration extends RetryingRegistration { - - // we use shorter timeouts here to speed up the tests - static final long INITIAL_TIMEOUT = 20; - static final long MAX_TIMEOUT = 200; - static final long DELAY_ON_ERROR = 200; - static final long DELAY_ON_DECLINE = 200; - - public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId) { - super(LoggerFactory.getLogger(RetryingRegistrationTest.class), - rpc, "TestEndpoint", - TestRegistrationGateway.class, - targetAddress, leaderId, - INITIAL_TIMEOUT, MAX_TIMEOUT, DELAY_ON_ERROR, DELAY_ON_DECLINE); - } - - @Override - protected Future invokeRegistration( - TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) { - return gateway.registrationCall(leaderId, timeoutMillis); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java deleted file mode 100644 index a049e48..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.registration; - -import akka.dispatch.Futures; - -import org.apache.flink.runtime.rpc.TestingGatewayBase; -import org.apache.flink.util.Preconditions; - -import scala.concurrent.Future; - -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -public class TestRegistrationGateway extends TestingGatewayBase { - - private final BlockingQueue invocations; - - private final RegistrationResponse[] responses; - - private int pos; - - public TestRegistrationGateway(RegistrationResponse... responses) { - Preconditions.checkArgument(responses != null && responses.length > 0); - - this.invocations = new LinkedBlockingQueue<>(); - this.responses = responses; - - } - - // ------------------------------------------------------------------------ - - public Future registrationCall(UUID leaderId, long timeout) { - invocations.add(new RegistrationCall(leaderId, timeout)); - - RegistrationResponse response = responses[pos]; - if (pos < responses.length - 1) { - pos++; - } - - // return a completed future (for a proper value), or one that never completes and will time out (for null) - return response != null ? Futures.successful(response) : this.futureWithTimeout(timeout); - } - - public BlockingQueue getInvocations() { - return invocations; - } - - // ------------------------------------------------------------------------ - - public static class RegistrationCall { - private final UUID leaderId; - private final long timeout; - - public RegistrationCall(UUID leaderId, long timeout) { - this.leaderId = leaderId; - this.timeout = timeout; - } - - public UUID leaderId() { - return leaderId; - } - - public long timeout() { - return timeout; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java deleted file mode 100644 index dfffeda..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.resourcemanager; - -import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; -import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.rpc.MainThreadExecutor; -import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcGateway; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.StartStoppable; -import org.junit.Assert; -import org.junit.Test; - -import java.util.UUID; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * resourceManager HA test, including grant leadership and revoke leadership - */ -public class ResourceManagerHATest { - - @Test - public void testGrantAndRevokeLeadership() throws Exception { - // mock a RpcService which will return a special RpcGateway when call its startServer method, the returned RpcGateway directly execute runAsync call - TestingResourceManagerGatewayProxy gateway = mock(TestingResourceManagerGatewayProxy.class); - doCallRealMethod().when(gateway).runAsync(any(Runnable.class)); - - RpcService rpcService = mock(RpcService.class); - when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway); - - TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); - TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); - - final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices); - resourceManager.start(); - // before grant leadership, resourceManager's leaderId is null - Assert.assertNull(resourceManager.getLeaderSessionID()); - final UUID leaderId = UUID.randomUUID(); - leaderElectionService.isLeader(leaderId); - // after grant leadership, resourceManager's leaderId has value - Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID()); - // then revoke leadership, resourceManager's leaderId is null again - leaderElectionService.notLeader(); - Assert.assertNull(resourceManager.getLeaderSessionID()); - } - - private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway { - @Override - public void runAsync(Runnable runnable) { - runnable.run(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java deleted file mode 100644 index 25a670c..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.taskexecutor; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.highavailability.NonHaServices; -import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; -import org.apache.flink.runtime.rpc.TestingRpcService; -import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import scala.concurrent.duration.FiniteDuration; - -import java.util.UUID; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -public class TaskExecutorTest extends TestLogger { - - @Test - public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception { - final ResourceID resourceID = ResourceID.generate(); - final String resourceManagerAddress = "/resource/manager/address/one"; - - final TestingRpcService rpc = new TestingRpcService(); - try { - // register a mock resource manager gateway - ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class); - rpc.registerGateway(resourceManagerAddress, rmGateway); - - NonHaServices haServices = new NonHaServices(resourceManagerAddress); - TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor( - new Configuration(), resourceID, rpc, "localhost", haServices, true); - String taskManagerAddress = taskManager.getAddress(); - taskManager.start(); - - verify(rmGateway, timeout(5000)).registerTaskExecutor( - any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class)); - } - finally { - rpc.stopService(); - } - } - - @Test - public void testTriggerRegistrationOnLeaderChange() throws Exception { - final ResourceID resourceID = ResourceID.generate(); - - final String address1 = "/resource/manager/address/one"; - final String address2 = "/resource/manager/address/two"; - final UUID leaderId1 = UUID.randomUUID(); - final UUID leaderId2 = UUID.randomUUID(); - - final TestingRpcService rpc = new TestingRpcService(); - try { - // register the mock resource manager gateways - ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class); - ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class); - rpc.registerGateway(address1, rmGateway1); - rpc.registerGateway(address2, rmGateway2); - - TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(); - - TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - haServices.setResourceManagerLeaderRetriever(testLeaderService); - - TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor( - new Configuration(), resourceID, rpc, "localhost", haServices, true); - String taskManagerAddress = taskManager.getAddress(); - taskManager.start(); - - // no connection initially, since there is no leader - assertNull(taskManager.getResourceManagerConnection()); - - // define a leader and see that a registration happens - testLeaderService.notifyListener(address1, leaderId1); - - verify(rmGateway1, timeout(5000)).registerTaskExecutor( - eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class)); - assertNotNull(taskManager.getResourceManagerConnection()); - - // cancel the leader - testLeaderService.notifyListener(null, null); - - // set a new leader, see that a registration happens - testLeaderService.notifyListener(address2, leaderId2); - - verify(rmGateway2, timeout(5000)).registerTaskExecutor( - eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class)); - assertNotNull(taskManager.getResourceManagerConnection()); - } - finally { - rpc.stopService(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java new file mode 100644 index 0000000..a8d5bd7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class TaskExecutorTest extends TestLogger { + + @Test + public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception { + final ResourceID resourceID = ResourceID.generate(); + final String resourceManagerAddress = "/resource/manager/address/one"; + + final TestingRpcService rpc = new TestingRpcService(); + try { + // register a mock resource manager gateway + ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class); + rpc.registerGateway(resourceManagerAddress, rmGateway); + + NonHaServices haServices = new NonHaServices(resourceManagerAddress); + TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor( + new Configuration(), resourceID, rpc, "localhost", haServices, true); + String taskManagerAddress = taskManager.getAddress(); + taskManager.start(); + + verify(rmGateway, timeout(5000)).registerTaskExecutor( + any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class)); + } + finally { + rpc.stopService(); + } + } + + @Test + public void testTriggerRegistrationOnLeaderChange() throws Exception { + final ResourceID resourceID = ResourceID.generate(); + + final String address1 = "/resource/manager/address/one"; + final String address2 = "/resource/manager/address/two"; + final UUID leaderId1 = UUID.randomUUID(); + final UUID leaderId2 = UUID.randomUUID(); + + final TestingRpcService rpc = new TestingRpcService(); + try { + // register the mock resource manager gateways + ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class); + ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class); + rpc.registerGateway(address1, rmGateway1); + rpc.registerGateway(address2, rmGateway2); + + TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(); + + TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + haServices.setResourceManagerLeaderRetriever(testLeaderService); + + TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor( + new Configuration(), resourceID, rpc, "localhost", haServices, true); + String taskManagerAddress = taskManager.getAddress(); + taskManager.start(); + + // no connection initially, since there is no leader + assertNull(taskManager.getResourceManagerConnection()); + + // define a leader and see that a registration happens + testLeaderService.notifyListener(address1, leaderId1); + + verify(rmGateway1, timeout(5000)).registerTaskExecutor( + eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class)); + assertNotNull(taskManager.getResourceManagerConnection()); + + // cancel the leader + testLeaderService.notifyListener(null, null); + + // set a new leader, see that a registration happens + testLeaderService.notifyListener(address2, leaderId2); + + verify(rmGateway2, timeout(5000)).registerTaskExecutor( + eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class)); + assertNotNull(taskManager.getResourceManagerConnection()); + } + finally { + rpc.stopService(); + } + } +}