flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5810) Harden SlotManager
Date Thu, 16 Mar 2017 19:56:42 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928748#comment-15928748
] 

ASF GitHub Bot commented on FLINK-5810:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3394#discussion_r106512992
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
---
    @@ -21,519 +21,897 @@
     import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.api.common.time.Time;
     import org.apache.flink.runtime.clusterframework.types.AllocationID;
    -import org.apache.flink.runtime.clusterframework.types.ResourceID;
    -import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
     import org.apache.flink.runtime.clusterframework.types.SlotID;
     import org.apache.flink.runtime.concurrent.BiFunction;
    +import org.apache.flink.runtime.concurrent.CompletableFuture;
     import org.apache.flink.runtime.concurrent.Future;
    -import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
    +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.Acknowledge;
     import org.apache.flink.runtime.resourcemanager.SlotRequest;
    -import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
    -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
    -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
    -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
    +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
    +import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
     import org.apache.flink.runtime.taskexecutor.SlotReport;
     import org.apache.flink.runtime.taskexecutor.SlotStatus;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
    +import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
     import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
     import java.util.LinkedHashMap;
     import java.util.Map;
    -
    -import static org.apache.flink.util.Preconditions.checkNotNull;
    +import java.util.Objects;
    +import java.util.UUID;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
     
     /**
    - * SlotManager is responsible for receiving slot requests and do slot allocations. It
allows to request
    - * slots from registered TaskManagers and issues container allocation requests in case
of there are not
    - * enough available slots. Besides, it should sync its slot allocation with TaskManager's
heartbeat.
    - * <p>
    - * The main operation principle of SlotManager is:
    - * <ul>
    - * <li>1. All slot allocation status should be synced with TaskManager, which is
the ground truth.</li>
    - * <li>2. All slots that have registered must be tracked, either by free pool or
allocated pool.</li>
    - * <li>3. All slot requests will be handled by best efforts, there is no guarantee
that one request will be
    - * fulfilled in time or correctly allocated. Conflicts or timeout or some special error
will happen, it should
    - * be handled outside SlotManager. SlotManager will make each decision based on the information
it currently
    - * holds.</li>
    - * </ul>
    - * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
    + * The slot manager is responsible for maintaining a view on all registered task manager
slots,
    + * their allocation and all pending slot requests. Whenever a new slot is registered
or and
    + * allocated slot is freed, then it tries to fulfill another pending slot request. Whenever
there
    + * are not enough slots available the slot manager will notify the resource manager about
it via
    + * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
    + *
    + * In order to free resources and avoid resource leaks, idling task managers (task managers
whose
    + * slots are currently not used) and not fulfilled pending slot requests time out triggering
their
    + * release and failure, respectively.
      */
    -public abstract class SlotManager {
    +public class SlotManager implements AutoCloseable {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
    +
    +	/** Scheduled executor for timeouts */
    +	private final ScheduledExecutor scheduledExecutor;
    +
    +	/** Timeout for slot requests to the task manager */
    +	private final Time taskManagerRequestTimeout;
    +
    +	/** Timeout after which an allocation is discarded */
    +	private final Time slotRequestTimeout;
    +
    +	/** Timeout after which an unused TaskManager is released */
    +	private final Time taskManagerTimeout;
    +
    +	/** Map for all registered slots */
    +	private final HashMap<SlotID, TaskManagerSlot> slots;
     
    -	protected final Logger LOG = LoggerFactory.getLogger(getClass());
    +	/** Index of all currently free slots */
    +	private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
     
    -	/** The Resource allocation provider */
    -	protected final ResourceManagerServices rmServices;
    +	/** All currently registered task managers */
    +	private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;
     
    -	/** All registered task managers with ResourceID and gateway. */
    -	private final Map<ResourceID, TaskExecutorRegistration> taskManagers;
    +	/** Map of fulfilled and active allocations for request deduplication purposes */
    +	private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
     
    -	/** All registered slots, including free and allocated slots */
    -	private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots;
    +	/** Map of pending/unfulfilled slot allocation requests */
    +	private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;
     
    -	/** All pending slot requests, waiting available slots to fulfil */
    -	private final Map<AllocationID, SlotRequest> pendingSlotRequests;
    +	/** Leader id of the containing component */
    +	private UUID leaderId;
     
    -	/** All free slots that can be used to be allocated */
    -	private final Map<SlotID, ResourceSlot> freeSlots;
    +	/** Executor for future callbacks which have to be "synchronized" */
    +	private Executor mainThreadExecutor;
     
    -	/** All allocations, we can lookup allocations either by SlotID or AllocationID */
    -	private final AllocationMap allocationMap;
    +	/** Callbacks for resource (de-)allocations */
    +	private ResourceManagerActions resourceManagerActions;
     
    -	private final Time timeout;
    +	/** True iff the component has been started */
    +	private boolean started;
     
    -	public SlotManager(ResourceManagerServices rmServices) {
    -		this.rmServices = checkNotNull(rmServices);
    -		this.registeredSlots = new HashMap<>(16);
    -		this.pendingSlotRequests = new LinkedHashMap<>(16);
    -		this.freeSlots = new HashMap<>(16);
    -		this.allocationMap = new AllocationMap();
    -		this.taskManagers = new HashMap<>();
    -		this.timeout = Time.seconds(10);
    +	public SlotManager(
    +			ScheduledExecutor scheduledExecutor,
    +			Time taskManagerRequestTimeout,
    +			Time slotRequestTimeout,
    +			Time taskManagerTimeout) {
    +		this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
    +		this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
    +		this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
    +		this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
    +
    +		slots = new HashMap<>(16);
    +		freeSlots = new LinkedHashMap<>(16);
    +		taskManagerRegistrations = new HashMap<>(4);
    +		fulfilledSlotRequests = new HashMap<>(16);
    +		pendingSlotRequests = new HashMap<>(16);
    +
    +		leaderId = null;
    +		resourceManagerActions = null;
    +		started = false;
     	}
     
    -	// ------------------------------------------------------------------------
    -	//  slot managements
    -	// ------------------------------------------------------------------------
    +	// ---------------------------------------------------------------------------------------------
    +	// Component lifecycle methods
    +	// ---------------------------------------------------------------------------------------------
     
     	/**
    -	 * Request a slot with requirements, we may either fulfill the request or pending it.
Trigger container
    -	 * allocation if we don't have enough resource. If we have free slot which can match
the request, record
    -	 * this allocation and forward the request to TaskManager through ResourceManager (we
want this done by
    -	 * RPC's main thread to avoid race condition).
    +	 * Starts the slot manager with the given leader id and resource manager actions.
     	 *
    -	 * @param request The detailed request of the slot
    -	 * @return RMSlotRequestRegistered The confirmation message to be send to the caller
    +	 * @param newLeaderId to use for communication with the task managers
    +	 * @param newResourceManagerActions to use for resource (de-)allocations
    +	 */
    +	public void start(UUID newLeaderId, Executor newMainThreadExecutor, ResourceManagerActions
newResourceManagerActions) {
    +		leaderId = Preconditions.checkNotNull(newLeaderId);
    +		mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
    +		resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions);
    +
    +		started = true;
    +	}
    +
    +	/**
    +	 * Suspends the component. This clears the internal state of the slot manager.
     	 */
    -	public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
    -		final AllocationID allocationId = request.getAllocationId();
    -		if (isRequestDuplicated(request)) {
    -			LOG.warn("Duplicated slot request, AllocationID:{}", allocationId);
    -			return new RMSlotRequestRegistered(allocationId);
    +	public void suspend() {
    +		for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
    +			cancelPendingSlotRequest(pendingSlotRequest);
     		}
     
    -		// try to fulfil the request with current free slots
    -		final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
    -		if (slot != null) {
    -			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
    -				allocationId, request.getJobId());
    +		pendingSlotRequests.clear();
    +
    +		HashSet<InstanceID> registeredTaskManagers = new HashSet<>(taskManagerRegistrations.keySet());
    +
    +		for (InstanceID registeredTaskManager : registeredTaskManagers) {
    +			unregisterTaskManager(registeredTaskManager);
    +		}
    +
    +		leaderId = null;
    +		resourceManagerActions = null;
    +		started = false;
    +	}
    +
    +	/**
    +	 * Closes the slot manager.
    +	 *
    +	 * @throws Exception if the close operation fails
    +	 */
    +	@Override
    +	public void close() throws Exception {
    +		suspend();
    +	}
    +
    +	// ---------------------------------------------------------------------------------------------
    +	// Public API
    +	// ---------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Requests a slot with the respective resource profile.
    +	 *
    +	 * @param slotRequest specifying the requested slot specs
    +	 * @return true if the slot request was registered; false if the request is a duplicate
    +	 * @throws SlotManagerException if the slot request failed (e.g. not enough resources
left)
    +	 */
    +	public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException
{
    +		checkInit();
     
    -			// record this allocation in bookkeeping
    -			allocationMap.addAllocation(slot.getSlotId(), allocationId);
    -			// remove selected slot from free pool
    -			freeSlots.remove(slot.getSlotId());
    +		if (checkDuplicateRequest(slotRequest.getAllocationId())) {
    +			LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());
     
    -			sendSlotRequest(slot, request);
    +			return false;
     		} else {
    -			LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
    -				"AllocationID:{}, JobID:{}", allocationId, request.getJobId());
    -			Preconditions.checkState(rmServices != null,
    -				"Attempted to allocate resources but no ResourceManagerServices set.");
    -			rmServices.allocateResource(request.getResourceProfile());
    -			pendingSlotRequests.put(allocationId, request);
    +			PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
    +
    +			pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
    +
    +			try {
    +				internalRequestSlot(pendingSlotRequest);
    +			} catch (ResourceManagerException e) {
    +				// requesting the slot failed --> remove pending slot request
    +				pendingSlotRequests.remove(slotRequest.getAllocationId());
    +
    +				throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId()
+ '.', e);
    +			}
    +
    +			return true;
     		}
    +	}
     
    -		return new RMSlotRequestRegistered(allocationId);
    +	/**
    +	 * Cancels and removes a pending slot request with the given allocation id. If there
is no such
    +	 * pending request, then nothing is done.
    +	 *
    +	 * @param allocationId identifying the pending slot request
    +	 * @return True if a pending slot request was found; otherwise false
    +	 */
    +	public boolean unregisterSlotRequest(AllocationID allocationId) {
    +		checkInit();
    +
    +		PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
    +
    +		if (null != pendingSlotRequest) {
    +			cancelPendingSlotRequest(pendingSlotRequest);
    +
    +			return true;
    +		} else {
    +			LOG.debug("No pending slot request with allocation id {} found.", allocationId);
    +
    +			return false;
    +		}
     	}
     
     	/**
    -	 * Notifies the SlotManager that a slot is available again after being allocated.
    -	 * @param slotID slot id of available slot
    +	 * Registers a new task manager at the slot manager. This will make the task managers
slots
    +	 * known and, thus, available for allocation.
    +	 *
    +	 * @param taskExecutorConnection for the new task manager
    +	 * @param initialSlotReport for the new task manager
     	 */
    -	public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
    -		if (!allocationMap.isAllocated(slotID)) {
    -			throw new IllegalStateException("Slot was not previously allocated but " +
    -				"TaskManager reports it as available again");
    +	public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection,
SlotReport initialSlotReport) {
    +		checkInit();
    +
    +		// we identify task managers by their instance id
    +		if (!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID()))
{
    +			TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection);
    +			taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
     		}
    -		allocationMap.removeAllocation(slotID);
    -		final Map<SlotID, ResourceSlot> slots = registeredSlots.get(resourceID);
    -		ResourceSlot freeSlot = slots.get(slotID);
    -		if (freeSlot == null) {
    -			throw new IllegalStateException("Slot was not registered with SlotManager but " +
    -				"TaskManager reported it to be available.");
    +
    +		reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
    +	}
    +
    +	/**
    +	 * Unregisters the task manager identified by the given instance id and its associated
slots
    +	 * from the slot manager.
    +	 *
    +	 * @param instanceId identifying the task manager to unregister
    +	 * @return True if there existed a registered task manager with the given instance id
    +	 */
    +	public boolean unregisterTaskManager(InstanceID instanceId) {
    +		checkInit();
    +
    +		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
    +
    +		if (null != taskManagerRegistration) {
    +			removeSlots(taskManagerRegistration.getSlots());
    +
    +			taskManagerRegistration.cancelTimeout();
    +
    +			return true;
    +		} else {
    +			LOG.debug("There is no task manager registered with instance ID {}. Ignoring this
message.", instanceId);
    +
    +			return false;
     		}
    -		handleFreeSlot(freeSlot);
     	}
     
     	/**
    -	 * The slot request to TaskManager may be either failed by rpc communication (timeout,
network error, etc.)
    -	 * or really rejected by TaskManager. We shall retry this request by:
    -	 * <ul>
    -	 * <li>1. verify and clear all the previous allocate information for this request
    -	 * <li>2. try to request slot again
    -	 * </ul>
    -	 * <p>
    -	 * This may cause some duplicate allocation, e.g. the slot request to TaskManager is
successful but the response
    -	 * is lost somehow, so we may request a slot in another TaskManager, this causes two
slots assigned to one request,
    -	 * but it can be taken care of by rejecting registration at JobManager.
    +	 * Reports the current slot allocations for a task manager identified by the given instance
id.
     	 *
    -	 * @param originalRequest The original slot request
    -	 * @param slotId          The target SlotID
    +	 * @param instanceId identifying the task manager for which to report the slot status
    +	 * @param slotReport containing the status for all of its slots
    +	 * @return true if the slot status has been updated successfully, otherwise false
     	 */
    -	void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID
slotId) {
    -		final AllocationID originalAllocationId = originalRequest.getAllocationId();
    -		LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}",
    -			slotId, originalAllocationId, originalRequest.getJobId());
    -
    -		if (allocationMap.isAllocated(slotId)) {
    -			final AllocationID expectedAllocationId = allocationMap.getAllocationID(slotId);
    -
    -			// check whether we have an agreement on whom this slot belongs to
    -			if (originalAllocationId.equals(expectedAllocationId)) {
    -				LOG.info("De-allocate this request and retry");
    -				allocationMap.removeAllocation(expectedAllocationId);
    -				pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest);
    -				ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
    -				// treat this slot as empty and retry with a different request
    -				handleFreeSlot(slot);
    +	public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
    +		checkInit();
    +
    +		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
    +
    +		if (null != taskManagerRegistration) {
    +			HashSet<SlotID> slotsToRemove = new HashSet<>(taskManagerRegistration.getSlots());
    +			boolean idle = true;
    +
    +			for (SlotStatus slotStatus : slotReport) {
    +				if (slotsToRemove.remove(slotStatus.getSlotID())) {
    +					// slot which was already registered
    +					updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID());
    +				} else {
    +					// new slot
    +					registerSlot(
    +						instanceId,
    +						slotStatus.getSlotID(),
    +						slotStatus.getAllocationID(),
    +						slotStatus.getResourceProfile(),
    +						taskManagerRegistration.getTaskManagerConnection());
    +				}
    +
    +				TaskManagerSlot slot = slots.get(slotStatus.getSlotID());
    +
    +				idle &= slot.isFree();
    +			}
    +
    +			// remove the slots for which we haven't received a slot status message
    +			removeSlots(slotsToRemove);
    +
    +			if (idle) {
    --- End diff --
    
    I assume this breaks the TaskManager timeouts. Every time a heartbeat comes (with a slot
report), the TaskManager is detected to be idle, and the timeout is scheduled, overriding
the previous timeout (pushing the timeout further back).


> Harden SlotManager
> ------------------
>
>                 Key: FLINK-5810
>                 URL: https://issues.apache.org/jira/browse/FLINK-5810
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>    Affects Versions: 1.3.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> Harden the {{SlotManager}} logic to better cope with lost messages.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message