Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7C438200B7C for ; Thu, 8 Sep 2016 17:28:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7AF41160AAD; Thu, 8 Sep 2016 15:28:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8582B160AD7 for ; Thu, 8 Sep 2016 17:28:32 +0200 (CEST) Received: (qmail 34392 invoked by uid 500); 8 Sep 2016 15:28:26 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 32671 invoked by uid 99); 8 Sep 2016 15:28:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Sep 2016 15:28:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A3381EF79F; Thu, 8 Sep 2016 15:28:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Thu, 08 Sep 2016 15:29:01 -0000 Message-Id: <039343a33b91457b953fda490622b513@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [39/50] [abbrv] flink git commit: [FLINK-4347][cluster management] Implement SlotManager core archived-at: Thu, 08 Sep 2016 15:28:34 -0000 [FLINK-4347][cluster management] Implement SlotManager core This closes #2388 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/108f43f7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/108f43f7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/108f43f7 Branch: refs/heads/flip-6 Commit: 108f43f7e9dec63815df1272b6d819715d76b78d Parents: 73de842 Author: Kurt Young Authored: Thu Aug 18 15:48:30 2016 +0800 Committer: Till Rohrmann Committed: Thu Sep 8 17:27:00 2016 +0200 ---------------------------------------------------------------------- .../runtime/clusterframework/SlotManager.java | 525 ++++++++++++++++++ .../clusterframework/types/ResourceID.java | 4 +- .../clusterframework/types/ResourceProfile.java | 5 + .../clusterframework/types/ResourceSlot.java | 66 +++ .../runtime/clusterframework/types/SlotID.java | 14 +- .../rpc/resourcemanager/SlotRequest.java | 51 +- .../runtime/rpc/taskexecutor/SlotReport.java | 56 ++ .../runtime/rpc/taskexecutor/SlotStatus.java | 129 +++++ .../clusterframework/SlotManagerTest.java | 540 +++++++++++++++++++ 9 files changed, 1382 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/108f43f7/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java new file mode 100644 index 0000000..cc140a1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.ResourceSlot; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest; +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport; +import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request + * slots from registered TaskManagers and issues container allocation requests in case of there are not + * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat. + *

+ * The main operation principle of SlotManager is: + *

    + *
  • 1. All slot allocation status should be synced with TaskManager, which is the ground truth.
  • + *
  • 2. All slots that have registered must be tracked, either by free pool or allocated pool.
  • + *
  • 3. All slot requests will be handled by best efforts, there is no guarantee that one request will be + * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should + * be handled outside SlotManager. SlotManager will make each decision based on the information it currently + * holds.
  • + *
+ * IMPORTANT: This class is Not Thread-safe. + */ +public abstract class SlotManager { + + private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); + + /** Gateway to communicate with ResourceManager */ + private final ResourceManagerGateway resourceManagerGateway; + + /** All registered slots, including free and allocated slots */ + private final Map> registeredSlots; + + /** All pending slot requests, waiting available slots to fulfil */ + private final Map pendingSlotRequests; + + /** All free slots that can be used to be allocated */ + private final Map freeSlots; + + /** All allocations, we can lookup allocations either by SlotID or AllocationID */ + private final AllocationMap allocationMap; + + public SlotManager(ResourceManagerGateway resourceManagerGateway) { + this.resourceManagerGateway = checkNotNull(resourceManagerGateway); + this.registeredSlots = new HashMap<>(16); + this.pendingSlotRequests = new LinkedHashMap<>(16); + this.freeSlots = new HashMap<>(16); + this.allocationMap = new AllocationMap(); + } + + // ------------------------------------------------------------------------ + // slot managements + // ------------------------------------------------------------------------ + + /** + * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container + * allocation if we don't have enough resource. If we have free slot which can match the request, record + * this allocation and forward the request to TaskManager through ResourceManager (we want this done by + * RPC's main thread to avoid race condition). + * + * @param request The detailed request of the slot + */ + public void requestSlot(final SlotRequest request) { + if (isRequestDuplicated(request)) { + LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); + return; + } + + // try to fulfil the request with current free slots + ResourceSlot slot = chooseSlotToUse(request, freeSlots); + if (slot != null) { + LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), + request.getAllocationId(), request.getJobId()); + + // record this allocation in bookkeeping + allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + + // remove selected slot from free pool + freeSlots.remove(slot.getSlotId()); + + // TODO: send slot request to TaskManager + } else { + LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " + + "AllocationID:{}, JobID:{}", request.getAllocationId(), request.getJobId()); + allocateContainer(request.getResourceProfile()); + pendingSlotRequests.put(request.getAllocationId(), request); + } + } + + /** + * Sync slot status with TaskManager's SlotReport. + */ + public void updateSlotStatus(final SlotReport slotReport) { + for (SlotStatus slotStatus : slotReport.getSlotsStatus()) { + updateSlotStatus(slotStatus); + } + } + + /** + * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.) + * or really rejected by TaskManager. We shall retry this request by: + *
    + *
  • 1. verify and clear all the previous allocate information for this request + *
  • 2. try to request slot again + *
+ *

+ * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response + * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request, + * but it can be taken care of by rejecting registration at JobManager. + * + * @param originalRequest The original slot request + * @param slotId The target SlotID + */ + public void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) { + final AllocationID originalAllocationId = originalRequest.getAllocationId(); + LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}", + slotId, originalAllocationId, originalRequest.getJobId()); + + // verify the allocation info before we do anything + if (freeSlots.containsKey(slotId)) { + // this slot is currently empty, no need to de-allocate it from our allocations + LOG.info("Original slot is somehow empty, retrying this request"); + + // before retry, we should double check whether this request was allocated by some other ways + if (!allocationMap.isAllocated(originalAllocationId)) { + requestSlot(originalRequest); + } else { + LOG.info("The failed request has somehow been allocated, SlotID:{}", + allocationMap.getSlotID(originalAllocationId)); + } + } else if (allocationMap.isAllocated(slotId)) { + final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId); + + // check whether we have an agreement on whom this slot belongs to + if (originalAllocationId.equals(currentAllocationId)) { + LOG.info("De-allocate this request and retry"); + allocationMap.removeAllocation(currentAllocationId); + + // put this slot back to free pool + ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId)); + freeSlots.put(slotId, slot); + + // retry the request + requestSlot(originalRequest); + } else { + // the slot is taken by someone else, no need to de-allocate it from our allocations + LOG.info("Original slot is taken by someone else, current AllocationID:{}", currentAllocationId); + + // before retry, we should double check whether this request was allocated by some other ways + if (!allocationMap.isAllocated(originalAllocationId)) { + requestSlot(originalRequest); + } else { + LOG.info("The failed request is somehow been allocated, SlotID:{}", + allocationMap.getSlotID(originalAllocationId)); + } + } + } else { + LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId); + } + } + + /** + * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots. + * + * @param resourceId The ResourceID of the TaskManager + */ + public void notifyTaskManagerFailure(final ResourceID resourceId) { + LOG.info("Resource:{} been notified failure", resourceId); + final Map slotIdsToRemove = registeredSlots.remove(resourceId); + if (slotIdsToRemove != null) { + for (SlotID slotId : slotIdsToRemove.keySet()) { + LOG.info("Removing Slot:{} upon resource failure", slotId); + if (freeSlots.containsKey(slotId)) { + freeSlots.remove(slotId); + } else if (allocationMap.isAllocated(slotId)) { + allocationMap.removeAllocation(slotId); + } else { + LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId); + } + } + } + } + + // ------------------------------------------------------------------------ + // internal behaviors + // ------------------------------------------------------------------------ + + /** + * Update slot status based on TaskManager's report. There are mainly two situations when we receive the report: + *

    + *
  • 1. The slot is newly registered.
  • + *
  • 2. The slot has registered, it contains its current status.
  • + *
+ *

+ * Regarding 1: It's fairly simple, we just record this slot's status, and trigger schedule if slot is empty. + *

+ * Regarding 2: It will cause some weird situation since we may have some time-gap on how the slot's status really + * is. We may have some updates on the slot's allocation, but it doesn't reflected by TaskManager's heartbeat yet, + * and we may make some wrong decision if we cannot guarantee we have the exact status about all the slots. So + * the principle here is: We always trust TaskManager's heartbeat, we will correct our information based on that + * and take next action based on the diff between our information and heartbeat status. + * + * @param reportedStatus Reported slot status + */ + void updateSlotStatus(final SlotStatus reportedStatus) { + final SlotID slotId = reportedStatus.getSlotID(); + final ResourceSlot slot = new ResourceSlot(slotId, reportedStatus.getProfiler()); + + if (registerNewSlot(slot)) { + // we have a newly registered slot + LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, reportedStatus.getAllocationID()); + + if (reportedStatus.getAllocationID() != null) { + // slot in use, record this in bookkeeping + allocationMap.addAllocation(slotId, reportedStatus.getAllocationID()); + } else { + handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler())); + } + } else { + // slot exists, update current information + if (reportedStatus.getAllocationID() != null) { + // slot is reported in use + final AllocationID reportedAllocationId = reportedStatus.getAllocationID(); + + // check whether we also thought this slot is in use + if (allocationMap.isAllocated(slotId)) { + // we also think that slot is in use, check whether the AllocationID matches + final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId); + + if (!reportedAllocationId.equals(currentAllocationId)) { + LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:{}", + slotId, currentAllocationId, reportedAllocationId); + + // seems we have a disagreement about the slot assignments, need to correct it + allocationMap.removeAllocation(slotId); + allocationMap.addAllocation(slotId, reportedAllocationId); + } + } else { + LOG.info("Slot allocation info mismatch! SlotID:{}, current:null, reported:{}", + slotId, reportedAllocationId); + + // we thought the slot is free, should correct this information + allocationMap.addAllocation(slotId, reportedStatus.getAllocationID()); + + // remove this slot from free slots pool + freeSlots.remove(slotId); + } + } else { + // slot is reported empty + + // check whether we also thought this slot is empty + if (allocationMap.isAllocated(slotId)) { + LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null", + slotId, allocationMap.getAllocationID(slotId)); + + // we thought the slot is in use, correct it + allocationMap.removeAllocation(slotId); + + // we have a free slot! + handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler())); + } + } + } + } + + /** + * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled, + * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot + * to the free pool. + * + * @param freeSlot The free slot + */ + private void handleFreeSlot(final ResourceSlot freeSlot) { + SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests); + + if (chosenRequest != null) { + pendingSlotRequests.remove(chosenRequest.getAllocationId()); + + LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(), + chosenRequest.getAllocationId(), chosenRequest.getJobId()); + allocationMap.addAllocation(freeSlot.getSlotId(), chosenRequest.getAllocationId()); + + // TODO: send slot request to TaskManager + } else { + freeSlots.put(freeSlot.getSlotId(), freeSlot); + } + } + + /** + * Check whether the request is duplicated. We use AllocationID to identify slot request, for each + * formerly received slot request, it is either in pending list or already been allocated. + * + * @param request The slot request + * @return true if the request is duplicated + */ + private boolean isRequestDuplicated(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); + return pendingSlotRequests.containsKey(allocationId) + || allocationMap.isAllocated(allocationId); + } + + /** + * Try to register slot, and tell if this slot is newly registered. + * + * @param slot The ResourceSlot which will be checked and registered + * @return true if we meet a new slot + */ + private boolean registerNewSlot(final ResourceSlot slot) { + final SlotID slotId = slot.getSlotId(); + final ResourceID resourceId = slotId.getResourceID(); + if (!registeredSlots.containsKey(resourceId)) { + registeredSlots.put(resourceId, new HashMap()); + } + return registeredSlots.get(resourceId).put(slotId, slot) == null; + } + + private ResourceSlot getRegisteredSlot(final SlotID slotId) { + final ResourceID resourceId = slotId.getResourceID(); + if (!registeredSlots.containsKey(resourceId)) { + return null; + } + return registeredSlots.get(resourceId).get(slotId); + } + + // ------------------------------------------------------------------------ + // Framework specific behavior + // ------------------------------------------------------------------------ + + /** + * Choose a slot to use among all free slots, the behavior is framework specified. + * + * @param request The slot request + * @param freeSlots All slots which can be used + * @return The slot we choose to use, null if we did not find a match + */ + protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request, + final Map freeSlots); + + /** + * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified. + * + * @param offeredSlot The free slot + * @param pendingRequests All the pending slot requests + * @return The chosen SlotRequest, null if we did not find a match + */ + protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot, + final Map pendingRequests); + + /** + * The framework specific code for allocating a container for specified resource profile. + * + * @param resourceProfile The resource profile + */ + protected abstract void allocateContainer(final ResourceProfile resourceProfile); + + + // ------------------------------------------------------------------------ + // Helper classes + // ------------------------------------------------------------------------ + + /** + * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info + * either by SlotID or AllocationID. + */ + private static class AllocationMap { + + /** All allocated slots (by SlotID) */ + private final Map allocatedSlots; + + /** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */ + private final Map allocatedSlotsByAllocationId; + + AllocationMap() { + this.allocatedSlots = new HashMap<>(16); + this.allocatedSlotsByAllocationId = new HashMap<>(16); + } + + /** + * Add a allocation + * + * @param slotId The slot id + * @param allocationId The allocation id + */ + void addAllocation(final SlotID slotId, final AllocationID allocationId) { + allocatedSlots.put(slotId, allocationId); + allocatedSlotsByAllocationId.put(allocationId, slotId); + } + + /** + * De-allocation with slot id + * + * @param slotId The slot id + */ + void removeAllocation(final SlotID slotId) { + if (allocatedSlots.containsKey(slotId)) { + final AllocationID allocationId = allocatedSlots.get(slotId); + allocatedSlots.remove(slotId); + allocatedSlotsByAllocationId.remove(allocationId); + } + } + + /** + * De-allocation with allocation id + * + * @param allocationId The allocation id + */ + void removeAllocation(final AllocationID allocationId) { + if (allocatedSlotsByAllocationId.containsKey(allocationId)) { + SlotID slotId = allocatedSlotsByAllocationId.get(allocationId); + allocatedSlotsByAllocationId.remove(allocationId); + allocatedSlots.remove(slotId); + } + } + + /** + * Check whether allocation exists by slot id + * + * @param slotId The slot id + * @return true if the allocation exists + */ + boolean isAllocated(final SlotID slotId) { + return allocatedSlots.containsKey(slotId); + } + + /** + * Check whether allocation exists by allocation id + * + * @param allocationId The allocation id + * @return true if the allocation exists + */ + boolean isAllocated(final AllocationID allocationId) { + return allocatedSlotsByAllocationId.containsKey(allocationId); + } + + AllocationID getAllocationID(final SlotID slotId) { + return allocatedSlots.get(slotId); + } + + SlotID getSlotID(final AllocationID allocationId) { + return allocatedSlotsByAllocationId.get(allocationId); + } + + public int size() { + return allocatedSlots.size(); + } + } + + // ------------------------------------------------------------------------ + // Testing utilities + // ------------------------------------------------------------------------ + + @VisibleForTesting + boolean isAllocated(final SlotID slotId) { + return allocationMap.isAllocated(slotId); + } + + @VisibleForTesting + boolean isAllocated(final AllocationID allocationId) { + return allocationMap.isAllocated(allocationId); + } + + /** + * Add free slots directly to the free pool, this will not trigger pending requests allocation + * + * @param slot The resource slot + */ + @VisibleForTesting + void addFreeSlot(final ResourceSlot slot) { + final ResourceID resourceId = slot.getResourceID(); + final SlotID slotId = slot.getSlotId(); + + if (!registeredSlots.containsKey(resourceId)) { + registeredSlots.put(resourceId, new HashMap()); + } + registeredSlots.get(resourceId).put(slot.getSlotId(), slot); + freeSlots.put(slotId, slot); + } + + @VisibleForTesting + int getAllocatedSlotCount() { + return allocationMap.size(); + } + + @VisibleForTesting + int getFreeSlotCount() { + return freeSlots.size(); + } + + @VisibleForTesting + int getPendingRequestCount() { + return pendingSlotRequests.size(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/108f43f7/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java index 8cf9ccb..6b8a037 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java @@ -63,9 +63,7 @@ public final class ResourceID implements ResourceIDRetrievable, Serializable { @Override public String toString() { - return "ResourceID{" + - "resourceId='" + resourceId + '\'' + - '}'; + return resourceId; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/108f43f7/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java index cbe709f..ff1c4bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java @@ -40,6 +40,11 @@ public class ResourceProfile implements Serializable { this.memoryInMB = memoryInMB; } + public ResourceProfile(ResourceProfile other) { + this.cpuCores = other.cpuCores; + this.memoryInMB = other.memoryInMB; + } + /** * Get the cpu cores needed * @return The cpu cores, 1.0 means a full cpu thread http://git-wip-us.apache.org/repos/asf/flink/blob/108f43f7/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java new file mode 100644 index 0000000..8a6db5f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A ResourceSlot represents a slot located in TaskManager from ResourceManager's view. It has a unique + * identification and resource profile which we can compare to the resource request. + */ +public class ResourceSlot implements ResourceIDRetrievable, Serializable { + + private static final long serialVersionUID = -5853720153136840674L; + + /** The unique identification of this slot */ + private final SlotID slotId; + + /** The resource profile of this slot */ + private final ResourceProfile resourceProfile; + + public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile) { + this.slotId = checkNotNull(slotId); + this.resourceProfile = checkNotNull(resourceProfile); + } + + @Override + public ResourceID getResourceID() { + return slotId.getResourceID(); + } + + public SlotID getSlotId() { + return slotId; + } + + public ResourceProfile getResourceProfile() { + return resourceProfile; + } + + /** + * Check whether required resource profile can be matched by this slot. + * + * @param required The required resource profile + * @return true if requirement can be matched + */ + public boolean isMatchingRequirement(ResourceProfile required) { + return resourceProfile.isMatching(required); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/108f43f7/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java index d1b072d..e831a5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java @@ -75,9 +75,15 @@ public class SlotID implements ResourceIDRetrievable, Serializable { @Override public String toString() { - return "SlotID{" + - "resourceId=" + resourceId + - ", slotId=" + slotId + - '}'; + return resourceId + "_" + slotId; + } + + /** + * Generate a random slot id. + * + * @return A random slot id. + */ + public static SlotID generate() { + return new SlotID(ResourceID.generate(), 0); } } http://git-wip-us.apache.org/repos/asf/flink/blob/108f43f7/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java index d8fe268..74c7c39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java @@ -18,8 +18,57 @@ package org.apache.flink.runtime.rpc.resourcemanager; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; + import java.io.Serializable; -public class SlotRequest implements Serializable{ +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This describes the requirement of the slot, mainly used by JobManager requesting slot from ResourceManager. + */ +public class SlotRequest implements Serializable { + private static final long serialVersionUID = -6586877187990445986L; + + /** The JobID of the slot requested for */ + private final JobID jobId; + + /** The unique identification of this request */ + private final AllocationID allocationId; + + /** The resource profile of the required slot */ + private final ResourceProfile resourceProfile; + + public SlotRequest(JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile) { + this.jobId = checkNotNull(jobId); + this.allocationId = checkNotNull(allocationId); + this.resourceProfile = checkNotNull(resourceProfile); + } + + /** + * Get the JobID of the slot requested for. + * @return The job id + */ + public JobID getJobId() { + return jobId; + } + + /** + * Get the unique identification of this request + * @return the allocation id + */ + public AllocationID getAllocationId() { + return allocationId; + } + + /** + * Get the resource profile of the desired slot + * @return The resource profile + */ + public ResourceProfile getResourceProfile() { + return resourceProfile; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/108f43f7/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java new file mode 100644 index 0000000..c372ecb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.taskexecutor; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; + +import java.io.Serializable; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A report about the current status of all slots of the TaskExecutor, describing + * which slots are available and allocated, and what jobs (JobManagers) the allocated slots + * have been allocated to. + */ +public class SlotReport implements Serializable { + + private static final long serialVersionUID = -3150175198722481689L; + + /** The slots status of the TaskManager */ + private final List slotsStatus; + + /** The resource id which identifies the TaskManager */ + private final ResourceID resourceID; + + public SlotReport(final List slotsStatus, final ResourceID resourceID) { + this.slotsStatus = checkNotNull(slotsStatus); + this.resourceID = checkNotNull(resourceID); + } + + public List getSlotsStatus() { + return slotsStatus; + } + + public ResourceID getResourceID() { + return resourceID; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/108f43f7/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java new file mode 100644 index 0000000..e8e2084 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.taskexecutor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This describes the slot current status which located in TaskManager. + */ +public class SlotStatus implements Serializable { + + private static final long serialVersionUID = 5099191707339664493L; + + /** slotID to identify a slot */ + private final SlotID slotID; + + /** the resource profile of the slot */ + private final ResourceProfile profiler; + + /** if the slot is allocated, allocationId identify its allocation; else, allocationId is null */ + private final AllocationID allocationID; + + /** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */ + private final JobID jobID; + + public SlotStatus(SlotID slotID, ResourceProfile profiler) { + this(slotID, profiler, null, null); + } + + public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID allocationID, JobID jobID) { + this.slotID = checkNotNull(slotID, "slotID cannot be null"); + this.profiler = checkNotNull(profiler, "profile cannot be null"); + this.allocationID = allocationID; + this.jobID = jobID; + } + + /** + * Get the unique identification of this slot + * + * @return The slot id + */ + public SlotID getSlotID() { + return slotID; + } + + /** + * Get the resource profile of this slot + * + * @return The resource profile + */ + public ResourceProfile getProfiler() { + return profiler; + } + + /** + * Get the allocation id of this slot + * + * @return The allocation id if this slot is allocated, otherwise null + */ + public AllocationID getAllocationID() { + return allocationID; + } + + /** + * Get the job id of the slot allocated for + * + * @return The job id if this slot is allocated, otherwise null + */ + public JobID getJobID() { + return jobID; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SlotStatus that = (SlotStatus) o; + + if (!slotID.equals(that.slotID)) { + return false; + } + if (!profiler.equals(that.profiler)) { + return false; + } + if (allocationID != null ? !allocationID.equals(that.allocationID) : that.allocationID != null) { + return false; + } + return jobID != null ? jobID.equals(that.jobID) : that.jobID == null; + + } + + @Override + public int hashCode() { + int result = slotID.hashCode(); + result = 31 * result + profiler.hashCode(); + result = 31 * result + (allocationID != null ? allocationID.hashCode() : 0); + result = 31 * result + (jobID != null ? jobID.hashCode() : 0); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/108f43f7/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java new file mode 100644 index 0000000..2ee280f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.ResourceSlot; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest; +import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus; +import org.junit.Before; +import org.junit.Test; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class SlotManagerTest { + + private static final double DEFAULT_TESTING_CPU_CORES = 1.0; + + private static final long DEFAULT_TESTING_MEMORY = 512; + + private static final ResourceProfile DEFAULT_TESTING_PROFILE = + new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY); + + private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = + new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2); + + private ResourceManagerGateway resourceManagerGateway; + + @Before + public void setUp() { + resourceManagerGateway = mock(ResourceManagerGateway.class); + } + + /** + * Tests that there are no free slots when we request, need to allocate from cluster manager master + */ + @Test + public void testRequestSlotWithoutFreeSlot() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + + assertEquals(0, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(1, slotManager.getPendingRequestCount()); + assertEquals(1, slotManager.getAllocatedContainers().size()); + assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0)); + } + + /** + * Tests that there are some free slots when we request, and the request is fulfilled immediately + */ + @Test + public void testRequestSlotWithFreeSlot() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + + directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1); + assertEquals(1, slotManager.getFreeSlotCount()); + + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertEquals(0, slotManager.getAllocatedContainers().size()); + } + + /** + * Tests that there are some free slots when we request, but none of them are suitable + */ + @Test + public void testRequestSlotWithoutSuitableSlot() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + + directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2); + assertEquals(2, slotManager.getFreeSlotCount()); + + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE)); + assertEquals(0, slotManager.getAllocatedSlotCount()); + assertEquals(2, slotManager.getFreeSlotCount()); + assertEquals(1, slotManager.getPendingRequestCount()); + assertEquals(1, slotManager.getAllocatedContainers().size()); + assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0)); + } + + /** + * Tests that we send duplicated slot request + */ + @Test + public void testDuplicatedSlotRequest() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1); + + SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE); + + slotManager.requestSlot(request1); + slotManager.requestSlot(request2); + slotManager.requestSlot(request2); + slotManager.requestSlot(request1); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(1, slotManager.getPendingRequestCount()); + assertEquals(1, slotManager.getAllocatedContainers().size()); + assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0)); + } + + /** + * Tests that we send multiple slot requests + */ + @Test + public void testRequestMultipleSlots() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5); + + // request 3 normal slots + for (int i = 0; i < 3; ++i) { + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + } + + // request 2 big slots + for (int i = 0; i < 2; ++i) { + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE)); + } + + // request 1 normal slot again + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + + assertEquals(4, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + assertEquals(2, slotManager.getPendingRequestCount()); + assertEquals(2, slotManager.getAllocatedContainers().size()); + assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0)); + assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1)); + } + + /** + * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request + */ + @Test + public void testNewlyAppearedFreeSlotFulfillPendingRequest() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + assertEquals(1, slotManager.getPendingRequestCount()); + + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slotId)); + } + + /** + * Tests that a new slot appeared in SlotReport, but we have no pending request + */ + @Test + public void testNewlyAppearedFreeSlot() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(0, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + } + + /** + * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests + */ + @Test + public void testNewlyAppearedFreeSlotNotMatchPendingRequests() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE)); + assertEquals(1, slotManager.getPendingRequestCount()); + + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(0, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + assertEquals(1, slotManager.getPendingRequestCount()); + assertFalse(slotManager.isAllocated(slotId)); + } + + /** + * Tests that a new slot appeared in SlotReport, and it's been reported using by some job + */ + @Test + public void testNewlyAppearedInUseSlot() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID()); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertTrue(slotManager.isAllocated(slotId)); + } + + /** + * Tests that we had a slot in-use, and it's confirmed by SlotReport + */ + @Test + public void testExistingInUseSlotUpdateStatus() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request); + + // make this slot in use + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertTrue(slotManager.isAllocated(slotId)); + + // slot status is confirmed + SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, + request.getAllocationId(), request.getJobId()); + slotManager.updateSlotStatus(slotStatus2); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertTrue(slotManager.isAllocated(slotId)); + } + + /** + * Tests that we had a slot in-use, but it's empty according to the SlotReport + */ + @Test + public void testExistingInUseSlotAdjustedToEmpty() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request1); + + // make this slot in use + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + // another request pending + SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request2); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(1, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slotId)); + assertTrue(slotManager.isAllocated(request1.getAllocationId())); + + + // but slot is reported empty again, request2 will be fulfilled, request1 will be missing + slotManager.updateSlotStatus(slotStatus); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slotId)); + assertTrue(slotManager.isAllocated(request2.getAllocationId())); + } + + /** + * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation + * information didn't match. + */ + @Test + public void testExistingInUseSlotWithDifferentAllocationInfo() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request); + + // make this slot in use + SlotID slotId = SlotID.generate(); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slotId)); + assertTrue(slotManager.isAllocated(request.getAllocationId())); + + SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID()); + // update slot status with different allocation info + slotManager.updateSlotStatus(slotStatus2); + + // original request is missing and won't be allocated + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slotId)); + assertFalse(slotManager.isAllocated(request.getAllocationId())); + assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID())); + } + + /** + * Tests that we had a free slot, and it's confirmed by SlotReport + */ + @Test + public void testExistingEmptySlotUpdateStatus() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE); + slotManager.addFreeSlot(slot); + + SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(0, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + } + + /** + * Tests that we had a free slot, and it's reported in-use by TaskManager + */ + @Test + public void testExistingEmptySlotAdjustedToInUse() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE); + slotManager.addFreeSlot(slot); + + SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE, + new AllocationID(), new JobID()); + slotManager.updateSlotStatus(slotStatus); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slot.getSlotId())); + } + + /** + * Tests that we did some allocation but failed / rejected by TaskManager, request will retry + */ + @Test + public void testSlotAllocationFailedAtTaskManager() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE); + slotManager.addFreeSlot(slot); + + SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertTrue(slotManager.isAllocated(slot.getSlotId())); + + slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId()); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + } + + + /** + * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request + */ + @Test + public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE); + slotManager.addFreeSlot(slot); + + SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request); + + // slot is set empty by heartbeat + SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile()); + slotManager.updateSlotStatus(slotStatus); + + // another request took this slot + SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + slotManager.requestSlot(request2); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertFalse(slotManager.isAllocated(request.getAllocationId())); + assertTrue(slotManager.isAllocated(request2.getAllocationId())); + + // original request should be pended + slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId()); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(1, slotManager.getPendingRequestCount()); + assertFalse(slotManager.isAllocated(request.getAllocationId())); + assertTrue(slotManager.isAllocated(request2.getAllocationId())); + } + + @Test + public void testNotifyTaskManagerFailure() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + + ResourceID resource1 = ResourceID.generate(); + ResourceID resource2 = ResourceID.generate(); + + ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE); + ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE); + ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE); + ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE); + + slotManager.addFreeSlot(slot11); + slotManager.addFreeSlot(slot21); + + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + + assertEquals(2, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + + slotManager.addFreeSlot(slot12); + slotManager.addFreeSlot(slot22); + + assertEquals(2, slotManager.getAllocatedSlotCount()); + assertEquals(2, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + + slotManager.notifyTaskManagerFailure(resource2); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + + // notify an not exist resource failure + slotManager.notifyTaskManagerFailure(ResourceID.generate()); + + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + } + + // ------------------------------------------------------------------------ + // testing utilities + // ------------------------------------------------------------------------ + + private void directlyProvideFreeSlots( + final SlotManager slotManager, + final ResourceProfile resourceProfile, + final int freeSlotNum) + { + for (int i = 0; i < freeSlotNum; ++i) { + slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile))); + } + } + + // ------------------------------------------------------------------------ + // testing classes + // ------------------------------------------------------------------------ + + private static class TestingSlotManager extends SlotManager { + + private final List allocatedContainers; + + TestingSlotManager(ResourceManagerGateway resourceManagerGateway) { + super(resourceManagerGateway); + this.allocatedContainers = new LinkedList<>(); + } + + /** + * Choose slot randomly if it matches requirement + * + * @param request The slot request + * @param freeSlots All slots which can be used + * @return The chosen slot or null if cannot find a match + */ + @Override + protected ResourceSlot chooseSlotToUse(SlotRequest request, Map freeSlots) { + for (ResourceSlot slot : freeSlots.values()) { + if (slot.isMatchingRequirement(request.getResourceProfile())) { + return slot; + } + } + return null; + } + + /** + * Choose request randomly if offered slot can match its requirement + * + * @param offeredSlot The free slot + * @param pendingRequests All the pending slot requests + * @return The chosen request's AllocationID or null if cannot find a match + */ + @Override + protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, + Map pendingRequests) + { + for (Map.Entry pendingRequest : pendingRequests.entrySet()) { + if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) { + return pendingRequest.getValue(); + } + } + return null; + } + + @Override + protected void allocateContainer(ResourceProfile resourceProfile) { + allocatedContainers.add(resourceProfile); + } + + List getAllocatedContainers() { + return allocatedContainers; + } + } +}