flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5810) Harden SlotManager
Date Wed, 26 Apr 2017 16:38:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985104#comment-15985104
] 

ASF GitHub Bot commented on FLINK-5810:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3394#discussion_r113502388
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
---
    @@ -21,519 +21,897 @@
     import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.api.common.time.Time;
     import org.apache.flink.runtime.clusterframework.types.AllocationID;
    -import org.apache.flink.runtime.clusterframework.types.ResourceID;
    -import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
     import org.apache.flink.runtime.clusterframework.types.SlotID;
     import org.apache.flink.runtime.concurrent.BiFunction;
    +import org.apache.flink.runtime.concurrent.CompletableFuture;
     import org.apache.flink.runtime.concurrent.Future;
    -import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
    +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.Acknowledge;
     import org.apache.flink.runtime.resourcemanager.SlotRequest;
    -import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
    -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
    -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
    -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
    +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
    +import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
     import org.apache.flink.runtime.taskexecutor.SlotReport;
     import org.apache.flink.runtime.taskexecutor.SlotStatus;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
    +import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
     import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
     import java.util.LinkedHashMap;
     import java.util.Map;
    -
    -import static org.apache.flink.util.Preconditions.checkNotNull;
    +import java.util.Objects;
    +import java.util.UUID;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
     
     /**
    - * SlotManager is responsible for receiving slot requests and do slot allocations. It
allows to request
    - * slots from registered TaskManagers and issues container allocation requests in case
of there are not
    - * enough available slots. Besides, it should sync its slot allocation with TaskManager's
heartbeat.
    - * <p>
    - * The main operation principle of SlotManager is:
    - * <ul>
    - * <li>1. All slot allocation status should be synced with TaskManager, which is
the ground truth.</li>
    - * <li>2. All slots that have registered must be tracked, either by free pool or
allocated pool.</li>
    - * <li>3. All slot requests will be handled by best efforts, there is no guarantee
that one request will be
    - * fulfilled in time or correctly allocated. Conflicts or timeout or some special error
will happen, it should
    - * be handled outside SlotManager. SlotManager will make each decision based on the information
it currently
    - * holds.</li>
    - * </ul>
    - * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
    + * The slot manager is responsible for maintaining a view on all registered task manager
slots,
    + * their allocation and all pending slot requests. Whenever a new slot is registered
or and
    + * allocated slot is freed, then it tries to fulfill another pending slot request. Whenever
there
    + * are not enough slots available the slot manager will notify the resource manager about
it via
    + * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
    + *
    + * In order to free resources and avoid resource leaks, idling task managers (task managers
whose
    + * slots are currently not used) and not fulfilled pending slot requests time out triggering
their
    + * release and failure, respectively.
      */
    -public abstract class SlotManager {
    +public class SlotManager implements AutoCloseable {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
    +
    +	/** Scheduled executor for timeouts */
    +	private final ScheduledExecutor scheduledExecutor;
    +
    +	/** Timeout for slot requests to the task manager */
    +	private final Time taskManagerRequestTimeout;
    +
    +	/** Timeout after which an allocation is discarded */
    +	private final Time slotRequestTimeout;
    +
    +	/** Timeout after which an unused TaskManager is released */
    +	private final Time taskManagerTimeout;
    +
    +	/** Map for all registered slots */
    +	private final HashMap<SlotID, TaskManagerSlot> slots;
     
    -	protected final Logger LOG = LoggerFactory.getLogger(getClass());
    +	/** Index of all currently free slots */
    +	private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
     
    -	/** The Resource allocation provider */
    -	protected final ResourceManagerServices rmServices;
    +	/** All currently registered task managers */
    +	private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;
     
    -	/** All registered task managers with ResourceID and gateway. */
    -	private final Map<ResourceID, TaskExecutorRegistration> taskManagers;
    +	/** Map of fulfilled and active allocations for request deduplication purposes */
    +	private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
     
    -	/** All registered slots, including free and allocated slots */
    -	private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots;
    +	/** Map of pending/unfulfilled slot allocation requests */
    +	private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;
     
    -	/** All pending slot requests, waiting available slots to fulfil */
    -	private final Map<AllocationID, SlotRequest> pendingSlotRequests;
    +	/** Leader id of the containing component */
    +	private UUID leaderId;
     
    -	/** All free slots that can be used to be allocated */
    -	private final Map<SlotID, ResourceSlot> freeSlots;
    +	/** Executor for future callbacks which have to be "synchronized" */
    +	private Executor mainThreadExecutor;
     
    -	/** All allocations, we can lookup allocations either by SlotID or AllocationID */
    -	private final AllocationMap allocationMap;
    +	/** Callbacks for resource (de-)allocations */
    +	private ResourceManagerActions resourceManagerActions;
     
    -	private final Time timeout;
    +	/** True iff the component has been started */
    +	private boolean started;
     
    -	public SlotManager(ResourceManagerServices rmServices) {
    -		this.rmServices = checkNotNull(rmServices);
    -		this.registeredSlots = new HashMap<>(16);
    -		this.pendingSlotRequests = new LinkedHashMap<>(16);
    -		this.freeSlots = new HashMap<>(16);
    -		this.allocationMap = new AllocationMap();
    -		this.taskManagers = new HashMap<>();
    -		this.timeout = Time.seconds(10);
    +	public SlotManager(
    +			ScheduledExecutor scheduledExecutor,
    +			Time taskManagerRequestTimeout,
    +			Time slotRequestTimeout,
    +			Time taskManagerTimeout) {
    +		this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
    +		this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
    +		this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
    +		this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
    +
    +		slots = new HashMap<>(16);
    +		freeSlots = new LinkedHashMap<>(16);
    +		taskManagerRegistrations = new HashMap<>(4);
    +		fulfilledSlotRequests = new HashMap<>(16);
    +		pendingSlotRequests = new HashMap<>(16);
    +
    +		leaderId = null;
    +		resourceManagerActions = null;
    +		started = false;
     	}
     
    -	// ------------------------------------------------------------------------
    -	//  slot managements
    -	// ------------------------------------------------------------------------
    +	// ---------------------------------------------------------------------------------------------
    +	// Component lifecycle methods
    +	// ---------------------------------------------------------------------------------------------
     
     	/**
    -	 * Request a slot with requirements, we may either fulfill the request or pending it.
Trigger container
    -	 * allocation if we don't have enough resource. If we have free slot which can match
the request, record
    -	 * this allocation and forward the request to TaskManager through ResourceManager (we
want this done by
    -	 * RPC's main thread to avoid race condition).
    +	 * Starts the slot manager with the given leader id and resource manager actions.
     	 *
    -	 * @param request The detailed request of the slot
    -	 * @return RMSlotRequestRegistered The confirmation message to be send to the caller
    +	 * @param newLeaderId to use for communication with the task managers
    +	 * @param newResourceManagerActions to use for resource (de-)allocations
    +	 */
    +	public void start(UUID newLeaderId, Executor newMainThreadExecutor, ResourceManagerActions
newResourceManagerActions) {
    +		leaderId = Preconditions.checkNotNull(newLeaderId);
    +		mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
    +		resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions);
    +
    +		started = true;
    +	}
    +
    +	/**
    +	 * Suspends the component. This clears the internal state of the slot manager.
     	 */
    -	public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
    -		final AllocationID allocationId = request.getAllocationId();
    -		if (isRequestDuplicated(request)) {
    -			LOG.warn("Duplicated slot request, AllocationID:{}", allocationId);
    -			return new RMSlotRequestRegistered(allocationId);
    +	public void suspend() {
    +		for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
    +			cancelPendingSlotRequest(pendingSlotRequest);
     		}
     
    -		// try to fulfil the request with current free slots
    -		final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
    -		if (slot != null) {
    -			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
    -				allocationId, request.getJobId());
    +		pendingSlotRequests.clear();
    +
    +		HashSet<InstanceID> registeredTaskManagers = new HashSet<>(taskManagerRegistrations.keySet());
    +
    +		for (InstanceID registeredTaskManager : registeredTaskManagers) {
    +			unregisterTaskManager(registeredTaskManager);
    +		}
    +
    +		leaderId = null;
    +		resourceManagerActions = null;
    +		started = false;
    +	}
    +
    +	/**
    +	 * Closes the slot manager.
    +	 *
    +	 * @throws Exception if the close operation fails
    +	 */
    +	@Override
    +	public void close() throws Exception {
    +		suspend();
    +	}
    +
    +	// ---------------------------------------------------------------------------------------------
    +	// Public API
    +	// ---------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Requests a slot with the respective resource profile.
    +	 *
    +	 * @param slotRequest specifying the requested slot specs
    +	 * @return true if the slot request was registered; false if the request is a duplicate
    +	 * @throws SlotManagerException if the slot request failed (e.g. not enough resources
left)
    +	 */
    +	public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException
{
    +		checkInit();
     
    -			// record this allocation in bookkeeping
    -			allocationMap.addAllocation(slot.getSlotId(), allocationId);
    -			// remove selected slot from free pool
    -			freeSlots.remove(slot.getSlotId());
    +		if (checkDuplicateRequest(slotRequest.getAllocationId())) {
    +			LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());
     
    -			sendSlotRequest(slot, request);
    +			return false;
     		} else {
    -			LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
    -				"AllocationID:{}, JobID:{}", allocationId, request.getJobId());
    -			Preconditions.checkState(rmServices != null,
    -				"Attempted to allocate resources but no ResourceManagerServices set.");
    -			rmServices.allocateResource(request.getResourceProfile());
    -			pendingSlotRequests.put(allocationId, request);
    +			PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
    +
    +			pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
    +
    +			try {
    +				internalRequestSlot(pendingSlotRequest);
    +			} catch (ResourceManagerException e) {
    +				// requesting the slot failed --> remove pending slot request
    +				pendingSlotRequests.remove(slotRequest.getAllocationId());
    +
    +				throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId()
+ '.', e);
    +			}
    +
    +			return true;
     		}
    +	}
     
    -		return new RMSlotRequestRegistered(allocationId);
    +	/**
    +	 * Cancels and removes a pending slot request with the given allocation id. If there
is no such
    +	 * pending request, then nothing is done.
    +	 *
    +	 * @param allocationId identifying the pending slot request
    +	 * @return True if a pending slot request was found; otherwise false
    +	 */
    +	public boolean unregisterSlotRequest(AllocationID allocationId) {
    +		checkInit();
    +
    +		PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
    +
    +		if (null != pendingSlotRequest) {
    +			cancelPendingSlotRequest(pendingSlotRequest);
    +
    +			return true;
    +		} else {
    +			LOG.debug("No pending slot request with allocation id {} found.", allocationId);
    +
    +			return false;
    +		}
     	}
     
     	/**
    -	 * Notifies the SlotManager that a slot is available again after being allocated.
    -	 * @param slotID slot id of available slot
    +	 * Registers a new task manager at the slot manager. This will make the task managers
slots
    +	 * known and, thus, available for allocation.
    +	 *
    +	 * @param taskExecutorConnection for the new task manager
    +	 * @param initialSlotReport for the new task manager
     	 */
    -	public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
    -		if (!allocationMap.isAllocated(slotID)) {
    -			throw new IllegalStateException("Slot was not previously allocated but " +
    -				"TaskManager reports it as available again");
    +	public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection,
SlotReport initialSlotReport) {
    +		checkInit();
    +
    +		// we identify task managers by their instance id
    +		if (!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID()))
{
    +			TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection);
    +			taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
     		}
    -		allocationMap.removeAllocation(slotID);
    -		final Map<SlotID, ResourceSlot> slots = registeredSlots.get(resourceID);
    -		ResourceSlot freeSlot = slots.get(slotID);
    -		if (freeSlot == null) {
    -			throw new IllegalStateException("Slot was not registered with SlotManager but " +
    -				"TaskManager reported it to be available.");
    +
    +		reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
    +	}
    +
    +	/**
    +	 * Unregisters the task manager identified by the given instance id and its associated
slots
    +	 * from the slot manager.
    +	 *
    +	 * @param instanceId identifying the task manager to unregister
    +	 * @return True if there existed a registered task manager with the given instance id
    +	 */
    +	public boolean unregisterTaskManager(InstanceID instanceId) {
    +		checkInit();
    +
    +		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
    +
    +		if (null != taskManagerRegistration) {
    +			removeSlots(taskManagerRegistration.getSlots());
    +
    +			taskManagerRegistration.cancelTimeout();
    +
    +			return true;
    +		} else {
    +			LOG.debug("There is no task manager registered with instance ID {}. Ignoring this
message.", instanceId);
    +
    +			return false;
     		}
    -		handleFreeSlot(freeSlot);
     	}
     
     	/**
    -	 * The slot request to TaskManager may be either failed by rpc communication (timeout,
network error, etc.)
    -	 * or really rejected by TaskManager. We shall retry this request by:
    -	 * <ul>
    -	 * <li>1. verify and clear all the previous allocate information for this request
    -	 * <li>2. try to request slot again
    -	 * </ul>
    -	 * <p>
    -	 * This may cause some duplicate allocation, e.g. the slot request to TaskManager is
successful but the response
    -	 * is lost somehow, so we may request a slot in another TaskManager, this causes two
slots assigned to one request,
    -	 * but it can be taken care of by rejecting registration at JobManager.
    +	 * Reports the current slot allocations for a task manager identified by the given instance
id.
     	 *
    -	 * @param originalRequest The original slot request
    -	 * @param slotId          The target SlotID
    +	 * @param instanceId identifying the task manager for which to report the slot status
    +	 * @param slotReport containing the status for all of its slots
    +	 * @return true if the slot status has been updated successfully, otherwise false
     	 */
    -	void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID
slotId) {
    -		final AllocationID originalAllocationId = originalRequest.getAllocationId();
    -		LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}",
    -			slotId, originalAllocationId, originalRequest.getJobId());
    -
    -		if (allocationMap.isAllocated(slotId)) {
    -			final AllocationID expectedAllocationId = allocationMap.getAllocationID(slotId);
    -
    -			// check whether we have an agreement on whom this slot belongs to
    -			if (originalAllocationId.equals(expectedAllocationId)) {
    -				LOG.info("De-allocate this request and retry");
    -				allocationMap.removeAllocation(expectedAllocationId);
    -				pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest);
    -				ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
    -				// treat this slot as empty and retry with a different request
    -				handleFreeSlot(slot);
    +	public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
    +		checkInit();
    +
    +		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
    +
    +		if (null != taskManagerRegistration) {
    +			HashSet<SlotID> slotsToRemove = new HashSet<>(taskManagerRegistration.getSlots());
    +			boolean idle = true;
    +
    +			for (SlotStatus slotStatus : slotReport) {
    +				if (slotsToRemove.remove(slotStatus.getSlotID())) {
    +					// slot which was already registered
    +					updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID());
    +				} else {
    +					// new slot
    +					registerSlot(
    +						instanceId,
    +						slotStatus.getSlotID(),
    +						slotStatus.getAllocationID(),
    +						slotStatus.getResourceProfile(),
    +						taskManagerRegistration.getTaskManagerConnection());
    +				}
    +
    +				TaskManagerSlot slot = slots.get(slotStatus.getSlotID());
    +
    +				idle &= slot.isFree();
    +			}
    +
    +			// remove the slots for which we haven't received a slot status message
    +			removeSlots(slotsToRemove);
    +
    +			if (idle) {
    +				// no slot of this task manager is being used --> register timer to free this
resource
    +				registerTaskManagerTimeout(taskManagerRegistration);
    +			}
    +
    +			return true;
    +		} else {
    +			LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring
this report.", instanceId);
    +
    +			return false;
    +		}
    +	}
    +
    +	/**
    +	 * Free the given slot from the given allocation. If the slot is still allocated by
the given
    +	 * allocation id, then the slot will be marked as free and will be subject to new slot
requests.
    +	 *
    +	 * @param slotId identifying the slot to free
    +	 * @param allocationId with which the slot is presumably allocated
    +	 */
    +	public void freeSlot(SlotID slotId, AllocationID allocationId) {
    +		checkInit();
    +
    +		TaskManagerSlot slot = slots.get(slotId);
    +
    +		if (null != slot) {
    +			if (slot.isAllocated()) {
    +				if (Objects.equals(allocationId, slot.getAllocationId())) {
    +					// free the slot
    +					slot.setAllocationId(null);
    +					fulfilledSlotRequests.remove(allocationId);
    +
    +					if (slot.isFree()) {
    +						handleFreeSlot(slot);
    +					}
    +
    +					TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
    +
    +					if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots()))
{
    +						registerTaskManagerTimeout(taskManagerRegistration);
    +					}
    +				} else {
    +					LOG.debug("Received request to free slot {} with expected allocation id {}, " +
    +						"but actual allocation id {} differs. Ignoring the request.", slotId, allocationId,
slot.getAllocationId());
    +				}
     			} else {
    -				LOG.error("Slot request failed for slot {} with allocation id {}:" +
    -						" Allocation id did not match the expected allocation id {}.",
    -					slotId, originalAllocationId, expectedAllocationId);
    +				LOG.debug("Slot {} has not been allocated.", allocationId);
     			}
     		} else {
    -			LOG.error("Slot request failed for slot {} with allocation id {}: " +
    -					"Slot was not previously registered.",
    -				slotId, originalAllocationId);
    +			LOG.debug("Trying to free a slot {} which has not been registered. Ignoring this message.",
slotId);
     		}
     	}
     
    +	// ---------------------------------------------------------------------------------------------
    +	// Behaviour methods
    +	// ---------------------------------------------------------------------------------------------
    +
     	/**
    -	 * Registers a TaskExecutor
    -	 * @param resourceID TaskExecutor's ResourceID
    -	 * @param registration TaskExecutor's registration
    -	 * @param slotReport TaskExecutor's free and allocated slots
    +	 * Finds a matching slot request for a given resource profile. If there is no such request,
    +	 * the method returns null.
    +	 *
    +	 * Note: If you want to change the behaviour of the slot manager wrt slot allocation
and
    +	 * request fulfillment, then you should override this method.
    +	 *
    +	 * @param slotResourceProfile defining the resources of an available slot
    +	 * @return A matching slot request which can be deployed in a slot with the given resource
    +	 * profile. Null if there is no such slot request pending.
     	 */
    -	public void registerTaskExecutor(
    -			ResourceID resourceID,
    -			TaskExecutorRegistration registration,
    -			SlotReport slotReport) {
    +	protected PendingSlotRequest findMatchingRequest(ResourceProfile slotResourceProfile)
{
     
    -		if (taskManagers.get(resourceID) != null) {
    -			notifyTaskManagerFailure(resourceID);
    +		for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
    +			if (!pendingSlotRequest.isAssigned() && slotResourceProfile.isMatching(pendingSlotRequest.getResourceProfile()))
{
    +				return pendingSlotRequest;
    +			}
     		}
     
    -		this.taskManagers.put(resourceID, registration);
    +		return null;
    +	}
    +
    +	/**
    +	 * Finds a matching slot for a given resource profile. A matching slot has at least
as many
    +	 * resources available as the given resource profile. If there is no such slot available,
then
    +	 * the method returns null.
    +	 *
    +	 * Note: If you want to change the behaviour of the slot manager wrt slot allocation
and
    +	 * request fulfillment, then you should override this method.
    +	 *
    +	 * @param requestResourceProfile specifying the resource requirements for the a slot
request
    +	 * @return A matching slot which fulfills the given resource profile. Null if there
is no such
    +	 * slot available.
    +	 */
    +	protected TaskManagerSlot findMatchingSlot(ResourceProfile requestResourceProfile) {
    +		Iterator<Map.Entry<SlotID, TaskManagerSlot>> iterator = freeSlots.entrySet().iterator();
    +
    +		while (iterator.hasNext()) {
    +			TaskManagerSlot taskManagerSlot = iterator.next().getValue();
     
    -		for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
    -			final SlotID slotId = slotStatus.getSlotID();
    +			// sanity check
    +			Preconditions.checkState(taskManagerSlot.isFree());
     
    -			final TaskExecutorRegistration taskExecutorRegistration = taskManagers.get(slotId.getResourceID());
    -			if (taskExecutorRegistration == null) {
    -				LOG.info("Received SlotStatus but ResourceID {} is unknown to the SlotManager",
    -					slotId.getResourceID());
    -				return;
    +			if (taskManagerSlot.getResourceProfile().isMatching(requestResourceProfile)) {
    +				iterator.remove();
    +				return taskManagerSlot;
     			}
    +		}
     
    -			final ResourceSlot slot = new ResourceSlot(slotId, slotStatus.getProfiler(), taskExecutorRegistration);
    +		return null;
    +	}
     
    -			registerNewSlot(slot);
    -			LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, slotStatus.getAllocationID());
    +	// ---------------------------------------------------------------------------------------------
    +	// Internal slot operations
    +	// ---------------------------------------------------------------------------------------------
     
    -			if (slotStatus.getAllocationID() != null) {
    -				// slot in use, record this in bookkeeping
    -				allocationMap.addAllocation(slotId, slotStatus.getAllocationID());
    -			} else {
    +	/**
    +	 * Registers a slot for the given task manager at the slot manager. The task manager
is
    +	 * identified by the given instance id and the slot is identified by the given slot
id. The
    +	 * given resource profile defines the available resources for the slot. The task manager
    +	 * connection can be used to communicate with the task manager.
    +	 *
    +	 * @param instanceId identifying the task manager on which the slot lives
    +	 * @param slotId identifying the slot on the task manager
    +	 * @param allocationId which is currently deployed in the slot
    +	 * @param resourceProfile of the slot
    +	 * @param taskManagerConnection to communicate with the remote task manager
    +	 */
    +	private void registerSlot(
    +			InstanceID instanceId,
    +			SlotID slotId,
    +			AllocationID allocationId,
    +			ResourceProfile resourceProfile,
    +			TaskExecutorConnection taskManagerConnection) {
    +		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
    +
    +		if (null != taskManagerRegistration) {
    +			TaskManagerSlot slot = new TaskManagerSlot(
    +				slotId,
    +				resourceProfile,
    +				taskManagerConnection,
    +				allocationId);
    +
    +			slots.put(slotId, slot);
    +
    +			taskManagerRegistration.addSlot(slotId);
    +
    +			if (slot.isFree()) {
     				handleFreeSlot(slot);
     			}
    +
    +			if (slot.isAllocated()) {
    +				fulfilledSlotRequests.put(slot.getAllocationId(), slotId);
    +			}
    +		} else {
    +			LOG.debug("Trying to register slot for unknown task manager with instance id {}.",
instanceId);
     		}
     	}
     
     	/**
    -	 * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean
up all its slots.
    +	 * Updates a slot with the given allocation id.
     	 *
    -	 * @param resourceId The ResourceID of the TaskManager
    +	 * @param slotId to update
    +	 * @param allocationId specifying the current allocation of the slot
     	 */
    -	public void notifyTaskManagerFailure(final ResourceID resourceId) {
    -		LOG.info("Resource:{} been notified failure", resourceId);
    -		taskManagers.remove(resourceId);
    -		final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId);
    -		if (slotIdsToRemove != null) {
    -			for (SlotID slotId : slotIdsToRemove.keySet()) {
    -				LOG.info("Removing Slot: {} upon resource failure", slotId);
    -				if (freeSlots.containsKey(slotId)) {
    -					freeSlots.remove(slotId);
    -				} else if (allocationMap.isAllocated(slotId)) {
    -					allocationMap.removeAllocation(slotId);
    -				} else {
    -					LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
    +	private void updateSlot(SlotID slotId, AllocationID allocationId) {
    +		TaskManagerSlot slot = slots.get(slotId);
    +
    +		if (null != slot) {
    +			// we assume the given allocation id to be the ground truth (coming from the TM)
    +			slot.setAllocationId(allocationId);
    +
    +			if (null != allocationId) {
    +				if (slot.hasPendingSlotRequest()){
    +					// we have a pending slot request --> check whether we have to reject it
    +					PendingSlotRequest pendingSlotRequest = slot.getAssignedSlotRequest();
    +
    +					if (Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) {
    +						// we can cancel the slot request because it has been fulfilled
    +						cancelPendingSlotRequest(pendingSlotRequest);
    +
    +						// remove the pending slot request, since it has been completed
    +						pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());
    +					} else {
    +						// this will try to find a new slot for the request
    +						rejectPendingSlotRequest(
    +							pendingSlotRequest,
    +							new Exception("Task manager reported slot " + slotId + " being already allocated."));
    +					}
    +
    +					slot.setAssignedSlotRequest(null);
    +				}
    +
    +				fulfilledSlotRequests.put(allocationId, slotId);
    +
    +				TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
    +
    +				if (null != taskManagerRegistration) {
    +					// disable any registered time out for the task manager
    +					taskManagerRegistration.cancelTimeout();
     				}
     			}
    +		} else {
    +			LOG.debug("Trying to update unknown slot with slot id {}.", slotId);
     		}
     	}
     
    -	// ------------------------------------------------------------------------
    -	//  internal behaviors
    -	// ------------------------------------------------------------------------
    -
     	/**
    -	 * When we have a free slot, try to fulfill the pending request first. If any request
can be fulfilled,
    -	 * record this allocation in bookkeeping and send slot request to TaskManager, else
we just add this slot
    -	 * to the free pool.
    +	 * Tries to allocate a slot for the given slot request. If there is no slot available,
the
    +	 * resource manager is informed to allocate more resources and a timeout for the request
is
    +	 * registered.
     	 *
    -	 * @param freeSlot The free slot
    +	 * @param pendingSlotRequest to allocate a slot for
    +	 * @throws ResourceManagerException if the resource manager cannot allocate more resource
     	 */
    -	private void handleFreeSlot(final ResourceSlot freeSlot) {
    -		SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests);
    +	private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException
{
    +		TaskManagerSlot taskManagerSlot = findMatchingSlot(pendingSlotRequest.getResourceProfile());
     
    -		if (chosenRequest != null) {
    -			final AllocationID allocationId = chosenRequest.getAllocationId();
    -			final SlotRequest slotRequest = pendingSlotRequests.remove(allocationId);
    +		if (taskManagerSlot != null) {
    +			allocateSlot(taskManagerSlot, pendingSlotRequest);
    +		} else {
    +			final UUID timeoutIdentifier = UUID.randomUUID();
    --- End diff --
    
    True, indeed. I think we don't need these identifier anymore if we have a single timeout
task which periodically checks all task managers and pending slot requests.


> Harden SlotManager
> ------------------
>
>                 Key: FLINK-5810
>                 URL: https://issues.apache.org/jira/browse/FLINK-5810
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>    Affects Versions: 1.3.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> Harden the {{SlotManager}} logic to better cope with lost messages.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message