Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 10AAC200C39 for ; Thu, 16 Mar 2017 20:56:52 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0F627160B72; Thu, 16 Mar 2017 19:56:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BAC82160B93 for ; Thu, 16 Mar 2017 20:56:50 +0100 (CET) Received: (qmail 5608 invoked by uid 500); 16 Mar 2017 19:56:49 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 5007 invoked by uid 99); 16 Mar 2017 19:56:49 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Mar 2017 19:56:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id D839118F15E for ; Thu, 16 Mar 2017 19:56:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.451 X-Spam-Level: * X-Spam-Status: No, score=1.451 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id EdLvf2rk_upT for ; Thu, 16 Mar 2017 19:56:45 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id ED1815FB79 for ; Thu, 16 Mar 2017 19:56:44 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 16E09E0BD1 for ; Thu, 16 Mar 2017 19:56:43 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id CCDF7254BA for ; Thu, 16 Mar 2017 19:56:42 +0000 (UTC) Date: Thu, 16 Mar 2017 19:56:42 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-5810) Harden SlotManager MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 16 Mar 2017 19:56:52 -0000 [ https://issues.apache.org/jira/browse/FLINK-5810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928748#comment-15928748 ] ASF GitHub Bot commented on FLINK-5810: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3394#discussion_r106512992 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -21,519 +21,897 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceSlot; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.resourcemanager.ResourceManagerServices; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** - * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request - * slots from registered TaskManagers and issues container allocation requests in case of there are not - * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat. - *

- * The main operation principle of SlotManager is: - *

    - *
  • 1. All slot allocation status should be synced with TaskManager, which is the ground truth.
  • - *
  • 2. All slots that have registered must be tracked, either by free pool or allocated pool.
  • - *
  • 3. All slot requests will be handled by best efforts, there is no guarantee that one request will be - * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should - * be handled outside SlotManager. SlotManager will make each decision based on the information it currently - * holds.
  • - *
- * IMPORTANT: This class is Not Thread-safe. + * The slot manager is responsible for maintaining a view on all registered task manager slots, + * their allocation and all pending slot requests. Whenever a new slot is registered or and + * allocated slot is freed, then it tries to fulfill another pending slot request. Whenever there + * are not enough slots available the slot manager will notify the resource manager about it via + * {@link ResourceManagerActions#allocateResource(ResourceProfile)}. + * + * In order to free resources and avoid resource leaks, idling task managers (task managers whose + * slots are currently not used) and not fulfilled pending slot requests time out triggering their + * release and failure, respectively. */ -public abstract class SlotManager { +public class SlotManager implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); + + /** Scheduled executor for timeouts */ + private final ScheduledExecutor scheduledExecutor; + + /** Timeout for slot requests to the task manager */ + private final Time taskManagerRequestTimeout; + + /** Timeout after which an allocation is discarded */ + private final Time slotRequestTimeout; + + /** Timeout after which an unused TaskManager is released */ + private final Time taskManagerTimeout; + + /** Map for all registered slots */ + private final HashMap slots; - protected final Logger LOG = LoggerFactory.getLogger(getClass()); + /** Index of all currently free slots */ + private final LinkedHashMap freeSlots; - /** The Resource allocation provider */ - protected final ResourceManagerServices rmServices; + /** All currently registered task managers */ + private final HashMap taskManagerRegistrations; - /** All registered task managers with ResourceID and gateway. */ - private final Map taskManagers; + /** Map of fulfilled and active allocations for request deduplication purposes */ + private final HashMap fulfilledSlotRequests; - /** All registered slots, including free and allocated slots */ - private final Map> registeredSlots; + /** Map of pending/unfulfilled slot allocation requests */ + private final HashMap pendingSlotRequests; - /** All pending slot requests, waiting available slots to fulfil */ - private final Map pendingSlotRequests; + /** Leader id of the containing component */ + private UUID leaderId; - /** All free slots that can be used to be allocated */ - private final Map freeSlots; + /** Executor for future callbacks which have to be "synchronized" */ + private Executor mainThreadExecutor; - /** All allocations, we can lookup allocations either by SlotID or AllocationID */ - private final AllocationMap allocationMap; + /** Callbacks for resource (de-)allocations */ + private ResourceManagerActions resourceManagerActions; - private final Time timeout; + /** True iff the component has been started */ + private boolean started; - public SlotManager(ResourceManagerServices rmServices) { - this.rmServices = checkNotNull(rmServices); - this.registeredSlots = new HashMap<>(16); - this.pendingSlotRequests = new LinkedHashMap<>(16); - this.freeSlots = new HashMap<>(16); - this.allocationMap = new AllocationMap(); - this.taskManagers = new HashMap<>(); - this.timeout = Time.seconds(10); + public SlotManager( + ScheduledExecutor scheduledExecutor, + Time taskManagerRequestTimeout, + Time slotRequestTimeout, + Time taskManagerTimeout) { + this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor); + this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); + this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); + this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); + + slots = new HashMap<>(16); + freeSlots = new LinkedHashMap<>(16); + taskManagerRegistrations = new HashMap<>(4); + fulfilledSlotRequests = new HashMap<>(16); + pendingSlotRequests = new HashMap<>(16); + + leaderId = null; + resourceManagerActions = null; + started = false; } - // ------------------------------------------------------------------------ - // slot managements - // ------------------------------------------------------------------------ + // --------------------------------------------------------------------------------------------- + // Component lifecycle methods + // --------------------------------------------------------------------------------------------- /** - * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container - * allocation if we don't have enough resource. If we have free slot which can match the request, record - * this allocation and forward the request to TaskManager through ResourceManager (we want this done by - * RPC's main thread to avoid race condition). + * Starts the slot manager with the given leader id and resource manager actions. * - * @param request The detailed request of the slot - * @return RMSlotRequestRegistered The confirmation message to be send to the caller + * @param newLeaderId to use for communication with the task managers + * @param newResourceManagerActions to use for resource (de-)allocations + */ + public void start(UUID newLeaderId, Executor newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) { + leaderId = Preconditions.checkNotNull(newLeaderId); + mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); + resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions); + + started = true; + } + + /** + * Suspends the component. This clears the internal state of the slot manager. */ - public RMSlotRequestRegistered requestSlot(final SlotRequest request) { - final AllocationID allocationId = request.getAllocationId(); - if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); - return new RMSlotRequestRegistered(allocationId); + public void suspend() { + for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) { + cancelPendingSlotRequest(pendingSlotRequest); } - // try to fulfil the request with current free slots - final ResourceSlot slot = chooseSlotToUse(request, freeSlots); - if (slot != null) { - LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - allocationId, request.getJobId()); + pendingSlotRequests.clear(); + + HashSet registeredTaskManagers = new HashSet<>(taskManagerRegistrations.keySet()); + + for (InstanceID registeredTaskManager : registeredTaskManagers) { + unregisterTaskManager(registeredTaskManager); + } + + leaderId = null; + resourceManagerActions = null; + started = false; + } + + /** + * Closes the slot manager. + * + * @throws Exception if the close operation fails + */ + @Override + public void close() throws Exception { + suspend(); + } + + // --------------------------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------------------------- + + /** + * Requests a slot with the respective resource profile. + * + * @param slotRequest specifying the requested slot specs + * @return true if the slot request was registered; false if the request is a duplicate + * @throws SlotManagerException if the slot request failed (e.g. not enough resources left) + */ + public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException { + checkInit(); - // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), allocationId); - // remove selected slot from free pool - freeSlots.remove(slot.getSlotId()); + if (checkDuplicateRequest(slotRequest.getAllocationId())) { + LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId()); - sendSlotRequest(slot, request); + return false; } else { - LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " + - "AllocationID:{}, JobID:{}", allocationId, request.getJobId()); - Preconditions.checkState(rmServices != null, - "Attempted to allocate resources but no ResourceManagerServices set."); - rmServices.allocateResource(request.getResourceProfile()); - pendingSlotRequests.put(allocationId, request); + PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); + + pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest); + + try { + internalRequestSlot(pendingSlotRequest); + } catch (ResourceManagerException e) { + // requesting the slot failed --> remove pending slot request + pendingSlotRequests.remove(slotRequest.getAllocationId()); + + throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e); + } + + return true; } + } - return new RMSlotRequestRegistered(allocationId); + /** + * Cancels and removes a pending slot request with the given allocation id. If there is no such + * pending request, then nothing is done. + * + * @param allocationId identifying the pending slot request + * @return True if a pending slot request was found; otherwise false + */ + public boolean unregisterSlotRequest(AllocationID allocationId) { + checkInit(); + + PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId); + + if (null != pendingSlotRequest) { + cancelPendingSlotRequest(pendingSlotRequest); + + return true; + } else { + LOG.debug("No pending slot request with allocation id {} found.", allocationId); + + return false; + } } /** - * Notifies the SlotManager that a slot is available again after being allocated. - * @param slotID slot id of available slot + * Registers a new task manager at the slot manager. This will make the task managers slots + * known and, thus, available for allocation. + * + * @param taskExecutorConnection for the new task manager + * @param initialSlotReport for the new task manager */ - public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) { - if (!allocationMap.isAllocated(slotID)) { - throw new IllegalStateException("Slot was not previously allocated but " + - "TaskManager reports it as available again"); + public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) { + checkInit(); + + // we identify task managers by their instance id + if (!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) { + TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection); + taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration); } - allocationMap.removeAllocation(slotID); - final Map slots = registeredSlots.get(resourceID); - ResourceSlot freeSlot = slots.get(slotID); - if (freeSlot == null) { - throw new IllegalStateException("Slot was not registered with SlotManager but " + - "TaskManager reported it to be available."); + + reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport); + } + + /** + * Unregisters the task manager identified by the given instance id and its associated slots + * from the slot manager. + * + * @param instanceId identifying the task manager to unregister + * @return True if there existed a registered task manager with the given instance id + */ + public boolean unregisterTaskManager(InstanceID instanceId) { + checkInit(); + + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId); + + if (null != taskManagerRegistration) { + removeSlots(taskManagerRegistration.getSlots()); + + taskManagerRegistration.cancelTimeout(); + + return true; + } else { + LOG.debug("There is no task manager registered with instance ID {}. Ignoring this message.", instanceId); + + return false; } - handleFreeSlot(freeSlot); } /** - * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.) - * or really rejected by TaskManager. We shall retry this request by: - *
    - *
  • 1. verify and clear all the previous allocate information for this request - *
  • 2. try to request slot again - *
- *

- * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response - * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request, - * but it can be taken care of by rejecting registration at JobManager. + * Reports the current slot allocations for a task manager identified by the given instance id. * - * @param originalRequest The original slot request - * @param slotId The target SlotID + * @param instanceId identifying the task manager for which to report the slot status + * @param slotReport containing the status for all of its slots + * @return true if the slot status has been updated successfully, otherwise false */ - void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) { - final AllocationID originalAllocationId = originalRequest.getAllocationId(); - LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}", - slotId, originalAllocationId, originalRequest.getJobId()); - - if (allocationMap.isAllocated(slotId)) { - final AllocationID expectedAllocationId = allocationMap.getAllocationID(slotId); - - // check whether we have an agreement on whom this slot belongs to - if (originalAllocationId.equals(expectedAllocationId)) { - LOG.info("De-allocate this request and retry"); - allocationMap.removeAllocation(expectedAllocationId); - pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest); - ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId)); - // treat this slot as empty and retry with a different request - handleFreeSlot(slot); + public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) { + checkInit(); + + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); + + if (null != taskManagerRegistration) { + HashSet slotsToRemove = new HashSet<>(taskManagerRegistration.getSlots()); + boolean idle = true; + + for (SlotStatus slotStatus : slotReport) { + if (slotsToRemove.remove(slotStatus.getSlotID())) { + // slot which was already registered + updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID()); + } else { + // new slot + registerSlot( + instanceId, + slotStatus.getSlotID(), + slotStatus.getAllocationID(), + slotStatus.getResourceProfile(), + taskManagerRegistration.getTaskManagerConnection()); + } + + TaskManagerSlot slot = slots.get(slotStatus.getSlotID()); + + idle &= slot.isFree(); + } + + // remove the slots for which we haven't received a slot status message + removeSlots(slotsToRemove); + + if (idle) { --- End diff -- I assume this breaks the TaskManager timeouts. Every time a heartbeat comes (with a slot report), the TaskManager is detected to be idle, and the timeout is scheduled, overriding the previous timeout (pushing the timeout further back). > Harden SlotManager > ------------------ > > Key: FLINK-5810 > URL: https://issues.apache.org/jira/browse/FLINK-5810 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination > Affects Versions: 1.3.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > > Harden the {{SlotManager}} logic to better cope with lost messages. -- This message was sent by Atlassian JIRA (v6.3.15#6346)