flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm <...@git.apache.org>
Subject [GitHub] flink pull request #2571: [FLINK-4348] Simplify logic of SlotManager
Date Thu, 06 Oct 2016 10:20:37 GMT
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2571#discussion_r82157384
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
---
    @@ -136,53 +127,34 @@ public SlotRequestRegistered requestSlot(final SlotRequest request)
{
     			// record this allocation in bookkeeping
     			allocationMap.addAllocation(slot.getSlotId(), allocationId);
     			// remove selected slot from free pool
    -			final ResourceSlot removedSlot = freeSlots.remove(slot.getSlotId());
    -
    -			final Future<SlotRequestReply> slotRequestReplyFuture =
    -				slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
    -
    -			slotRequestReplyFuture.handleAsync(new BiFunction<SlotRequestReply, Throwable,
Object>() {
    -				@Override
    -				public Object apply(SlotRequestReply slotRequestReply, Throwable throwable) {
    -					if (throwable != null) {
    -						// we failed, put the slot and the request back again
    -						if (allocationMap.isAllocated(slot.getSlotId())) {
    -							// only re-add if the slot hasn't been removed in the meantime
    -							freeSlots.put(slot.getSlotId(), removedSlot);
    -						}
    -						pendingSlotRequests.put(allocationId, request);
    -					}
    -					return null;
    -				}
    -			}, resourceManagerServices.getExecutor());
    +			freeSlots.remove(slot.getSlotId());
    +
    +			sendSlotRequest(slot, request);
     		} else {
     			LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
     				"AllocationID:{}, JobID:{}", allocationId, request.getJobId());
    -			Preconditions.checkState(resourceManagerServices != null,
    +			Preconditions.checkState(rmServices != null,
     				"Attempted to allocate resources but no ResourceManagerServices set.");
    -			resourceManagerServices.allocateResource(request.getResourceProfile());
    +			rmServices.allocateResource(request.getResourceProfile());
     			pendingSlotRequests.put(allocationId, request);
     		}
     
    -		return new SlotRequestRegistered(allocationId);
    +		return new RMSlotRequestRegistered(allocationId);
     	}
     
     	/**
    -	 * Sync slot status with TaskManager's SlotReport.
    +	 * Notifies the SlotManager that a slot is available again after being allocated.
    +	 * @param slotID slot id of available slot
     	 */
    -	public void updateSlotStatus(final SlotReport slotReport) {
    -		for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
    -			updateSlotStatus(slotStatus);
    +	public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
    +		if (!allocationMap.isAllocated(slotID)) {
    +			throw new IllegalStateException("Slot was not previously allocated but " +
    +				"TaskManager reports it as available again");
     		}
    -	}
    -
    -	/**
    -	 * Registers a TaskExecutor
    -	 * @param resourceID TaskExecutor's ResourceID
    -	 * @param gateway TaskExcutor's gateway
    -	 */
    -	public void registerTaskExecutor(ResourceID resourceID, TaskExecutorGateway gateway)
{
    -		this.taskManagerGateways.put(resourceID, gateway);
    +		allocationMap.removeAllocation(slotID);
    +		final Map<SlotID, ResourceSlot> slots = registeredSlots.get(resourceID);
    +		ResourceSlot freeSlot = slots.get(slotID);
    --- End diff --
    
    Actually, this should never happen but I'll introduce a check nevertheless.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message