flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Date Thu, 14 Dec 2017 10:13:41 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5091#discussion_r156902096
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
---
    @@ -0,0 +1,722 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster.slotpool;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.runtime.jobmaster.LogicalSlot;
    +import org.apache.flink.runtime.jobmaster.SlotContext;
    +import org.apache.flink.runtime.jobmaster.SlotOwner;
    +import org.apache.flink.runtime.jobmaster.SlotRequestId;
    +import org.apache.flink.runtime.instance.SlotSharingGroupId;
    +import org.apache.flink.runtime.jobmanager.scheduler.Locality;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.util.AbstractID;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.AbstractCollection;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +
    +/**
    + * Manager which is responsible for slot sharing. Slot sharing allows to run different
    + * tasks in the same slot and to realize co-location constraints.
    + *
    + * <p>The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such
that
    + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying
    + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the
    + * co-location constraint running in this slot.
    + *
    + * <p>The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and
    + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain
    + * a number of other {@link TaskSlot} and the latter class represents the leave nodes.
    + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
    + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot
    + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot}
    + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only
if
    + * the task slot does not yet contain another child with the same {@link AbstractID}
identifying
    + * the actual task or the co-location constraint.
    + *
    + * <p>Normal slot sharing is represented by a root {@link MultiTaskSlot} which
contains a set
    + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents
a different
    + * task.
    + *
    + * <p>Co-location constraints are modeled by adding a {@link MultiTaskSlot} to
the root node. The co-location
    + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add
a second co-located
    + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added
to co-located
    + * multi task slot.
    + */
    +public class SlotSharingManager {
    +
    +	private final SlotSharingGroupId slotSharingGroupId;
    +
    +	// needed to release allocated slots after a complete multi task slot hierarchy has
been released
    +	private final AllocatedSlotActions allocatedSlotActions;
    +
    +	// owner of the slots to which to return them when they are released from the outside
    +	private final SlotOwner slotOwner;
    +
    +	private final Map<SlotRequestId, TaskSlot> allTaskSlots;
    +
    +	// Root nodes which have not been completed because the allocated slot is still pending
    +	private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
    +
    +	// Root nodes which have been completed (the underlying allocated slot has been assigned)
    +	private final Map<TaskManagerLocation, Set<MultiTaskSlot>> resolvedRootSlots;
    +
    +	// Internal class to iterate over all resolved root slots
    +	private ResolvedRootSlotValues resolvedMultiTaskSlotValues;
    +
    +	public SlotSharingManager(
    +			SlotSharingGroupId slotSharingGroupId,
    +			AllocatedSlotActions allocatedSlotActions,
    +			SlotOwner slotOwner) {
    +		this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId);
    +		this.allocatedSlotActions = Preconditions.checkNotNull(allocatedSlotActions);
    +		this.slotOwner = Preconditions.checkNotNull(slotOwner);
    +
    +		allTaskSlots = new HashMap<>(16);
    +		unresolvedRootSlots = new HashMap<>(16);
    +		resolvedRootSlots = new HashMap<>(16);
    +
    +		resolvedMultiTaskSlotValues = null;
    +	}
    +
    +	public boolean isEmpty() {
    +		return allTaskSlots.isEmpty();
    +	}
    +
    +	public boolean contains(SlotRequestId slotRequestId) {
    +		return allTaskSlots.containsKey(slotRequestId);
    +	}
    +
    +	@Nullable
    +	public TaskSlot getTaskSlot(SlotRequestId slotRequestId) {
    +		return allTaskSlots.get(slotRequestId);
    +	}
    +
    +	/**
    +	 * Creates a new root slot with the given {@link SlotRequestId}, {@link SlotContext}
future and
    +	 * the {@link SlotRequestId} of the allocated slot.
    +	 *
    +	 * @param slotRequestId of the root slot
    +	 * @param slotContextFuture with which we create the root slot
    +	 * @param allocatedSlotRequestId slot request id of the underlying allocated slot which
can be used
    +	 *                               to cancel the pending slot request or release the allocated
slot
    +	 * @return New root slot
    +	 */
    +	public MultiTaskSlot createRootSlot(
    +			SlotRequestId slotRequestId,
    +			CompletableFuture<SlotContext> slotContextFuture,
    +			SlotRequestId allocatedSlotRequestId) {
    +		final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot(
    +			slotRequestId,
    +			slotContextFuture,
    +			allocatedSlotRequestId);
    +
    +		allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
    +		unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
    +
    +		// add the root node to the set of resolved root nodes once the SlotContext future
has been completed
    +		// and we know the slot's TaskManagerLocation
    +		slotContextFuture.whenComplete(
    +			(SlotContext slotContext, Throwable throwable) -> {
    +				if (slotContext != null) {
    +					final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
    +
    +					if (resolvedRootNode != null) {
    +						final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
    +							slotContext.getTaskManagerLocation(),
    +							taskManagerLocation -> new HashSet<>(4));
    +
    +						innerCollection.add(resolvedRootNode);
    +					}
    +				} else {
    +					rootMultiTaskSlot.release(throwable);
    +				}
    +			});
    +
    +		return rootMultiTaskSlot;
    +	}
    +
    +	/**
    +	 * Gets a resolved root slot which does not yet contain the given groupId. First the
given set of
    +	 * preferred locations is checked.
    +	 *
    +	 * @param groupId which the returned slot must not contain
    +	 * @param locationPreferences specifying which locations are preferred
    +	 * @return the resolved root slot and its locality wrt to the specified location preferences
    +	 * 		or null if there was no root slot which did not contain the given groupId
    +	 */
    +	@Nullable
    +	public MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, Collection<TaskManagerLocation>
locationPreferences) {
    +		Preconditions.checkNotNull(locationPreferences);
    +
    +		if (locationPreferences.isEmpty()) {
    +			return getResolvedRootSlotWithoutLocationPreferences(groupId);
    +		} else {
    +			return getResolvedRootSlotWithLocationPreferences(groupId, locationPreferences);
    +		}
    +	}
    +
    +	/**
    +	 * Gets a resolved root slot which does not yet contain the given groupId. The method
will try to
    +	 * find a slot of a TaskManager contained in the collection of preferred locations.
If there is no such slot
    +	 * with free capacities available, then the method will look for slots of TaskManager
which run on the same
    +	 * machine as the TaskManager in the collection of preferred locations. If there is
no such slot, then any slot
    +	 * with free capacities is returned. If there is no such slot, then null is returned.
    +	 *
    +	 * @param groupId which the returned slot must not contain
    +	 * @param locationPreferences specifying which locations are preferred
    +	 * @return the resolved root slot and its locality wrt to the specified location preferences
    +	 * 		or null if there was not root slot which did not contain the given groupId
    +	 */
    +	@Nullable
    +	private MultiTaskSlotLocality getResolvedRootSlotWithLocationPreferences(AbstractID
groupId, Collection<TaskManagerLocation> locationPreferences) {
    +		Preconditions.checkNotNull(groupId);
    +		Preconditions.checkNotNull(locationPreferences);
    +		final Set<String> hostnameSet = new HashSet<>();
    +
    +		for (TaskManagerLocation locationPreference : locationPreferences) {
    +			final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(locationPreference);
    +
    +			if (multiTaskSlots != null) {
    +				for (MultiTaskSlot multiTaskSlot : multiTaskSlots) {
    +					if (!multiTaskSlot.contains(groupId)) {
    +						return MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
    +					}
    +				}
    +
    +				hostnameSet.add(locationPreference.getHostname());
    +			}
    +		}
    +
    +		MultiTaskSlot nonLocalMultiTaskSlot = null;
    +
    +		for (Map.Entry<TaskManagerLocation, Set<MultiTaskSlot>> taskManagerLocationSetEntry
: resolvedRootSlots.entrySet()) {
    +			if (hostnameSet.contains(taskManagerLocationSetEntry.getKey().getHostname())) {
    +				for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) {
    +					if (!multiTaskSlot.contains(groupId)) {
    +						return MultiTaskSlotLocality.of(multiTaskSlot, Locality.HOST_LOCAL);
    +					}
    +				}
    +			} else if (nonLocalMultiTaskSlot == null) {
    +				for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) {
    +					if (!multiTaskSlot.contains(groupId)) {
    +						nonLocalMultiTaskSlot = multiTaskSlot;
    +					}
    +				}
    +			}
    +		}
    +
    +		if (nonLocalMultiTaskSlot != null) {
    +			return MultiTaskSlotLocality.of(nonLocalMultiTaskSlot, Locality.NON_LOCAL);
    +		} else {
    +			return null;
    +		}
    +	}
    +
    +	/**
    +	 * Gets a resolved slot which does not yet contain the given groupId without any location
    +	 * preferences.
    +	 *
    +	 * @param groupId which the returned slot must not contain
    +	 * @return the resolved slot or null if there was no root slot with free capacities
    +	 */
    +	@Nullable
    +	private MultiTaskSlotLocality getResolvedRootSlotWithoutLocationPreferences(AbstractID
groupId) {
    +		Preconditions.checkNotNull(groupId);
    +
    +		for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
    +			for (MultiTaskSlot multiTaskSlot : multiTaskSlots) {
    +				if (!multiTaskSlot.contains(groupId)) {
    +					return MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNCONSTRAINED);
    +				}
    +			}
    +		}
    +
    +		return null;
    +	}
    +
    +	/**
    +	 * Gets an unresolved slot which does not yet contain the given groupId. An unresolved
    +	 * slot is a slot whose underlying allocated slot has not been allocated yet.
    +	 *
    +	 * @param groupId which the returned slot must not contain
    +	 * @return the unresolved slot or null if there was no root slot with free capacities
    +	 */
    +	@Nullable
    +	public MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
    +		for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
    +			if (!multiTaskSlot.contains(groupId)) {
    +				return multiTaskSlot;
    +			}
    +		}
    +
    +		return null;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	// Inner classes: TaskSlot hierarchy and helper classes
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Helper class which contains a {@link MultiTaskSlot} and its {@link Locality}.
    +	 */
    +	public static final class MultiTaskSlotLocality {
    +		private final MultiTaskSlot multiTaskSlot;
    +
    +		private final Locality locality;
    +
    +		public MultiTaskSlotLocality(MultiTaskSlot multiTaskSlot, Locality locality) {
    +			this.multiTaskSlot = Preconditions.checkNotNull(multiTaskSlot);
    +			this.locality = Preconditions.checkNotNull(locality);
    +		}
    +
    +		public MultiTaskSlot getMultiTaskSlot() {
    +			return multiTaskSlot;
    +		}
    +
    +		public Locality getLocality() {
    +			return locality;
    +		}
    +
    +		public static MultiTaskSlotLocality of(MultiTaskSlot multiTaskSlot, Locality locality)
{
    +			return new MultiTaskSlotLocality(multiTaskSlot, locality);
    +		}
    +	}
    +
    +	/**
    +	 * Base class for all task slots.
    +	 */
    +	public abstract static class TaskSlot {
    +		// every TaskSlot has an associated slot request id
    +		private final SlotRequestId slotRequestId;
    +
    +		// all task slots except for the root slots have a group id assigned
    +		@Nullable
    +		private final AbstractID groupId;
    +
    +		protected TaskSlot(SlotRequestId slotRequestId, @Nullable AbstractID groupId) {
    +			this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
    +			this.groupId = groupId;
    +		}
    +
    +		public SlotRequestId getSlotRequestId() {
    +			return slotRequestId;
    +		}
    +
    +		@Nullable
    +		public AbstractID getGroupId() {
    +			return groupId;
    +		}
    +
    +		/**
    +		 * Check whether the task slot contains the given groupId.
    +		 *
    +		 * @param groupId which to check whether it is contained
    +		 * @return true if the task slot contains the given groupId, otherwise false
    +		 */
    +		public boolean contains(AbstractID groupId) {
    +			return Objects.equals(this.groupId, groupId);
    +		}
    +
    +		/**
    +		 * Release the task slot.
    +		 *
    +		 * @param cause for the release
    +		 * @return true if the slot could be released, otherwise false
    +		 */
    +		public abstract boolean release(Throwable cause);
    +	}
    +
    +	/**
    +	 * {@link TaskSlot} implementation which can have multiple other task slots assigned
as children.
    +	 */
    +	public final class MultiTaskSlot extends TaskSlot implements AllocatedSlot.Payload {
    +
    +		private final Map<AbstractID, TaskSlot> children;
    +
    +		// the root node has its parent set to null
    +		@Nullable
    +		private final MultiTaskSlot parent;
    +
    +		// underlying allocated slot
    +		private final CompletableFuture<SlotContext> slotContextFuture;
    +
    +		// slot request id of the allocated slot
    +		@Nullable
    +		private final SlotRequestId allocatedSlotRequestId;
    +
    +		// true if we are currently releasing our children
    +		private boolean releasingChildren;
    +
    +		private MultiTaskSlot(
    +				SlotRequestId slotRequestId,
    +				AbstractID groupId,
    +				MultiTaskSlot parent) {
    +			this(
    +				slotRequestId,
    +				groupId,
    +				Preconditions.checkNotNull(parent),
    +				parent.getSlotContextFuture(),
    +				null);
    +		}
    +
    +		private MultiTaskSlot(
    +				SlotRequestId slotRequestId,
    +				CompletableFuture<SlotContext> slotContextFuture,
    +				SlotRequestId allocatedSlotRequestId) {
    +			this(
    +				slotRequestId,
    +				null,
    +				null,
    +				slotContextFuture,
    +				allocatedSlotRequestId);
    +		}
    +
    +		private MultiTaskSlot(
    +				SlotRequestId slotRequestId,
    +				@Nullable AbstractID groupId,
    +				MultiTaskSlot parent,
    +				CompletableFuture<SlotContext> slotContextFuture,
    +				SlotRequestId allocatedSlotRequestId) {
    +			super(slotRequestId, groupId);
    +
    +			this.parent = parent;
    +			this.slotContextFuture = Preconditions.checkNotNull(slotContextFuture);
    +			this.allocatedSlotRequestId = allocatedSlotRequestId;
    +
    +			this.children = new HashMap<>(16);
    +			this.releasingChildren = false;
    +
    +			slotContextFuture.whenComplete(
    +				(SlotContext ignored, Throwable throwable) -> {
    +					if (throwable != null) {
    +						release(throwable);
    +					}
    +				});
    +		}
    +
    +		public CompletableFuture<SlotContext> getSlotContextFuture() {
    +			return slotContextFuture;
    +		}
    +
    +		/**
    +		 * Allocates a {@link MultiTaskSlot} and registers it under the given groupId at
    +		 * this {@link MultiTaskSlot}.
    +		 *
    +		 * @param slotRequestId of the new multi task slot
    +		 * @param groupId under which the new multi task slot is registered
    +		 * @return the newly allocated {@link MultiTaskSlot}
    +		 */
    +		MultiTaskSlot allocateMultiTaskSlot(SlotRequestId slotRequestId, AbstractID groupId)
{
    +			Preconditions.checkState(!super.contains(groupId));
    +
    +			final MultiTaskSlot inner = new MultiTaskSlot(
    +				slotRequestId,
    +				groupId,
    +				this);
    +
    +			children.put(groupId, inner);
    +
    +			// register the newly allocated slot also at the SlotSharingManager
    +			allTaskSlots.put(slotRequestId, inner);
    +
    +			return inner;
    +		}
    +
    +		/**
    +		 * Allocates a {@link SingleTaskSlot} and registeres it under the given groupId at
    +		 * this {@link MultiTaskSlot}.
    +		 *
    +		 * @param slotRequestId of the new single task slot
    +		 * @param groupId under which the new single task slot is registered
    +		 * @param locality of the allocation
    +		 * @return the newly allocated {@link SingleTaskSlot}
    +		 */
    +		SingleTaskSlot allocateSingleTaskSlot(
    +				SlotRequestId slotRequestId,
    +				AbstractID groupId,
    +				Locality locality) {
    +			Preconditions.checkState(!super.contains(groupId));
    +
    +			final SingleTaskSlot leave = new SingleTaskSlot(
    --- End diff --
    
    will change.


---

Mime
View raw message