flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Date Thu, 14 Dec 2017 10:10:23 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5091#discussion_r156901374
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
---
    @@ -266,104 +279,367 @@ public void disconnectResourceManager() {
     	// ------------------------------------------------------------------------
     
     	@Override
    -	public CompletableFuture<SimpleSlot> allocateSlot(
    -			SlotRequestID requestId,
    -			ScheduledUnit task,
    -			ResourceProfile resources,
    -			Iterable<TaskManagerLocation> locationPreferences,
    +	public CompletableFuture<LogicalSlot> allocateSlot(
    +			SlotRequestId slotRequestId,
    +			ScheduledUnit scheduledUnit,
    +			ResourceProfile resourceProfile,
    +			Collection<TaskManagerLocation> locationPreferences,
    +			boolean allowQueuedScheduling,
     			Time timeout) {
     
    -		return internalAllocateSlot(requestId, task, resources, locationPreferences);
    +		return internalAllocateSlot(
    +			slotRequestId,
    +			scheduledUnit,
    +			resourceProfile,
    +			locationPreferences,
    +			allowQueuedScheduling);
     	}
     
    -	@Override
    -	public void returnAllocatedSlot(Slot slot) {
    -		internalReturnAllocatedSlot(slot);
    +	private CompletableFuture<LogicalSlot> internalAllocateSlot(
    +			SlotRequestId slotRequestId,
    +			ScheduledUnit task,
    +			ResourceProfile resourceProfile,
    +			Collection<TaskManagerLocation> locationPreferences,
    +			boolean allowQueuedScheduling) {
    +
    +		final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId();
    +
    +		if (slotSharingGroupId != null) {
    +			// allocate slot with slot sharing
    +			final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent(
    +				slotSharingGroupId,
    +				id -> new SlotSharingManager(
    +					id,
    +					this,
    +					providerAndOwner));
    +
    +			final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture;
    +
    +			try {
    +				if (task.getCoLocationConstraint() != null) {
    +					multiTaskSlotFuture = allocateCoLocatedMultiTaskSlot(
    +						task.getCoLocationConstraint(),
    +						multiTaskSlotManager,
    +						resourceProfile,
    +						locationPreferences,
    +						allowQueuedScheduling);
    +				} else {
    +					multiTaskSlotFuture = allocateMultiTaskSlot(
    +						task.getJobVertexId(), multiTaskSlotManager,
    +						resourceProfile,
    +						locationPreferences,
    +						allowQueuedScheduling);
    +				}
    +			} catch (NoResourceAvailableException noResourceException) {
    +				return FutureUtils.completedExceptionally(noResourceException);
    +			}
    +
    +			// sanity check
    +			Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
    +
    +			final SlotSharingManager.SingleTaskSlot leave = multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
    +				slotRequestId,
    +				task.getJobVertexId(),
    +				multiTaskSlotFuture.getLocality());
    +
    +			return leave.getLogicalSlotFuture();
    +		} else {
    +			// request an allocated slot to assign a single logical slot to
    +			CompletableFuture<SlotAndLocality> slotAndLocalityFuture = requestAllocatedSlot(
    +				slotRequestId,
    +				resourceProfile,
    +				locationPreferences,
    +				allowQueuedScheduling);
    +
    +			return slotAndLocalityFuture.thenApply(
    +				(SlotAndLocality slotAndLocality) -> {
    +					final AllocatedSlot allocatedSlot = slotAndLocality.getSlot();
    +
    +					final SingleLogicalSlot singleTaskSlot = new SingleLogicalSlot(
    +						slotRequestId,
    +						allocatedSlot,
    +						null,
    +						slotAndLocality.getLocality(),
    +						providerAndOwner);
    +
    +					if (allocatedSlot.tryAssignPayload(singleTaskSlot)) {
    +						return singleTaskSlot;
    +					} else {
    +						final FlinkException flinkException = new FlinkException("Could not assign payload
to allocated slot " + allocatedSlot.getAllocationId() + '.');
    +						releaseSlot(slotRequestId, null, flinkException);
    +						throw new CompletionException(flinkException);
    +					}
    +				});
    +		}
     	}
     
    -	@Override
    -	public CompletableFuture<Acknowledge> cancelSlotAllocation(SlotRequestID requestId)
{
    -		final PendingRequest pendingRequest = removePendingRequest(requestId);
    +	/**
    +	 * Allocates a co-located {@link SlotSharingManager.MultiTaskSlot} for the given {@link
CoLocationConstraint}.
    +	 *
    +	 * <p>If allowQueuedScheduling is true, then the returned {@link SlotSharingManager.MultiTaskSlot}
can be
    +	 * uncompleted.
    +	 *
    +	 * @param coLocationConstraint for which to allocate a {@link SlotSharingManager.MultiTaskSlot}
    +	 * @param multiTaskSlotManager responsible for the slot sharing group for which to allocate
the slot
    +	 * @param resourceProfile specifying the requirements for the requested slot
    +	 * @param locationPreferences containing preferred TaskExecutors on which to allocate
the slot
    +	 * @param allowQueuedScheduling true if queued scheduling (the returned task slot must
not be completed yet) is allowed, otherwise false
    +	 * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which contains the allocated{@link
SlotSharingManager.MultiTaskSlot}
    +	 * 		and its locality wrt the given location preferences
    +	 * @throws NoResourceAvailableException if no task slot could be allocated
    +	 */
    +	private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot(
    +			CoLocationConstraint coLocationConstraint,
    +			SlotSharingManager multiTaskSlotManager,
    +			ResourceProfile resourceProfile,
    +			Collection<TaskManagerLocation> locationPreferences,
    +			boolean allowQueuedScheduling) throws NoResourceAvailableException {
    +		final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId();
    +
    +		if (coLocationSlotRequestId != null) {
    +			// we have a slot assigned --> try to retrieve it
    +			final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);
    +
    +			if (taskSlot != null) {
    +				Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);
    +				return SlotSharingManager.MultiTaskSlotLocality.of(((SlotSharingManager.MultiTaskSlot)
taskSlot), Locality.LOCAL);
    +			} else {
    +				// the slot may have been cancelled in the mean time
    +				coLocationConstraint.setSlotRequestId(null);
    +			}
    +		}
     
    -		if (pendingRequest != null) {
    -			failPendingRequest(pendingRequest, new CancellationException("Allocation with request
id" + requestId + " cancelled."));
    +		final Collection<TaskManagerLocation> actualLocationPreferences;
    +
    +		if (coLocationConstraint.isAssigned()) {
    +			actualLocationPreferences = Collections.singleton(coLocationConstraint.getLocation());
     		} else {
    -			final Slot slot = allocatedSlots.get(requestId);
    +			actualLocationPreferences = locationPreferences;
    +		}
    +
    +		// get a new multi task slot
    +		final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = allocateMultiTaskSlot(
    +			coLocationConstraint.getGroupId(), multiTaskSlotManager,
    +			resourceProfile,
    +			actualLocationPreferences,
    +			allowQueuedScheduling);
    +
    +		// check whether we fulfill the co-location constraint
    +		if (coLocationConstraint.isAssigned() && multiTaskSlotLocality.getLocality()
!= Locality.LOCAL) {
    +			multiTaskSlotLocality.getMultiTaskSlot().release(
    +				new FlinkException("Multi task slot is not local and, thus, does not fulfill the
co-location constraint."));
     
    -			if (slot != null) {
    -				LOG.info("Returning allocated slot {} because the corresponding allocation request
{} was cancelled.", slot, requestId);
    -				if (slot.markCancelled()) {
    -					internalReturnAllocatedSlot(slot);
    +			throw new NoResourceAvailableException("Could not allocate a local multi task slot
for the " +
    +				"co location constraint " + coLocationConstraint + '.');
    +		}
    +
    +		final SlotRequestId slotRequestId = new SlotRequestId();
    +		final SlotSharingManager.MultiTaskSlot coLocationSlot = multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(
    +			slotRequestId,
    +			coLocationConstraint.getGroupId());
    +
    +		// mark the requested slot as co-located slot for other co-located tasks
    +		coLocationConstraint.setSlotRequestId(slotRequestId);
    +
    +		// lock the co-location constraint once we have obtained the allocated slot
    +		coLocationSlot.getSlotContextFuture().whenComplete(
    +			(SlotContext slotContext, Throwable throwable) -> {
    +				if (throwable == null) {
    +					// check whether we are still assigned to the co-location constraint
    +					if (Objects.equals(coLocationConstraint.getSlotRequestId(), slotRequestId)) {
    +						coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation());
    +					}
     				}
    +			});
    +
    +		return SlotSharingManager.MultiTaskSlotLocality.of(coLocationSlot, multiTaskSlotLocality.getLocality());
    +	}
    +
    +	/**
    +	 * Allocates a {@link SlotSharingManager.MultiTaskSlot} for the given groupId which
is in the
    +	 * slot sharing group for which the given {@link SlotSharingManager} is responsible.
    +	 *
    +	 * <p>If allowQueuedScheduling is true, then the method can return an uncompleted
{@link SlotSharingManager.MultiTaskSlot}.
    +	 *
    +	 * @param groupId for which to allocate a new {@link SlotSharingManager.MultiTaskSlot}
    +	 * @param slotSharingManager responsible for the slot sharing group for which to allocate
the slot
    +	 * @param resourceProfile specifying the requirements for the requested slot
    +	 * @param locationPreferences containing preferred TaskExecutors on which to allocate
the slot
    +	 * @param allowQueuedScheduling true if queued scheduling (the returned task slot must
not be completed yet) is allowed, otherwise false
    +	 * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which contains the allocated
{@link SlotSharingManager.MultiTaskSlot}
    +	 * 		and its locality wrt the given location preferences
    +	 * @throws NoResourceAvailableException if no task slot could be allocated
    +	 */
    +	private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(
    +			AbstractID groupId,
    +			SlotSharingManager slotSharingManager,
    +			ResourceProfile resourceProfile,
    +			Collection<TaskManagerLocation> locationPreferences,
    +			boolean allowQueuedScheduling) throws NoResourceAvailableException {
    +
    +		// check first whether we have a resolved root slot which we can use
    +		SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = slotSharingManager.getResolvedRootSlot(
    +			groupId,
    +			locationPreferences);
    +
    +		if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() ==
Locality.LOCAL) {
    +			return multiTaskSlotLocality;
    +		}
    +
    +		final SlotRequestId allocatedSlotRequestId = new SlotRequestId();
    +		final SlotRequestId multiTaskSlotRequestId = new SlotRequestId();
    +
    +		// check whether we have an allocated slot available which we can use to create a new
multi task slot in
    +		final SlotAndLocality slotAndLocality = pollAndAllocateSlot(allocatedSlotRequestId,
resourceProfile, locationPreferences);
    +
    +		if (slotAndLocality != null && (slotAndLocality.getLocality() == Locality.LOCAL
|| multiTaskSlotLocality == null)) {
    +
    +			final AllocatedSlot allocatedSlot = slotAndLocality.getSlot();
    +			final SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.createRootSlot(
    +				multiTaskSlotRequestId,
    +				CompletableFuture.completedFuture(slotAndLocality.getSlot()),
    +				allocatedSlotRequestId);
    +
    +			if (allocatedSlot.tryAssignPayload(multiTaskSlot)) {
    +				return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, slotAndLocality.getLocality());
     			} else {
    -				LOG.debug("There was no slot allocation with {} to be cancelled.", requestId);
    +				multiTaskSlot.release(new FlinkException("Could not assign payload to allocated slot
" +
    +					allocatedSlot.getAllocationId() + '.'));
     			}
     		}
     
    -		return CompletableFuture.completedFuture(Acknowledge.get());
    -	}
    +		if (multiTaskSlotLocality != null) {
    +			// prefer slot sharing group slots over unused slots
    +			if (slotAndLocality != null) {
    +				releaseSlot(
    +					allocatedSlotRequestId,
    +					null,
    +					new FlinkException("Locality constraint is not better fulfilled by allocated slot."));
    +			}
    +			return multiTaskSlotLocality;
    +		}
     
    -	CompletableFuture<SimpleSlot> internalAllocateSlot(
    -			SlotRequestID requestId,
    -			ScheduledUnit task,
    -			ResourceProfile resources,
    -			Iterable<TaskManagerLocation> locationPreferences) {
    +		if (allowQueuedScheduling) {
    +			// there is no slot immediately available --> check first for uncompleted slots
at the slot sharing group
    +			SlotSharingManager.MultiTaskSlot multiTaskSlotFuture = slotSharingManager.getUnresolvedRootSlot(groupId);
    +
    +			if (multiTaskSlotFuture == null) {
    +				// it seems as if we have to request a new slot from the resource manager, this is
always the last resort!!!
    +				final CompletableFuture<AllocatedSlot> futureSlot = requestNewAllocatedSlot(allocatedSlotRequestId,
resourceProfile);
    +
    +				multiTaskSlotFuture = slotSharingManager.createRootSlot(
    +					multiTaskSlotRequestId,
    +					futureSlot.thenApply(Function.identity()),
    --- End diff --
    
    Good point. Will change it.


---

Mime
View raw message