flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
Date Thu, 14 Dec 2017 09:14:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290591#comment-16290591
] 

ASF GitHub Bot commented on FLINK-7956:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5091#discussion_r156887866
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
---
    @@ -0,0 +1,722 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster.slotpool;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.runtime.jobmaster.LogicalSlot;
    +import org.apache.flink.runtime.jobmaster.SlotContext;
    +import org.apache.flink.runtime.jobmaster.SlotOwner;
    +import org.apache.flink.runtime.jobmaster.SlotRequestId;
    +import org.apache.flink.runtime.instance.SlotSharingGroupId;
    +import org.apache.flink.runtime.jobmanager.scheduler.Locality;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.util.AbstractID;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.AbstractCollection;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +
    +/**
    + * Manager which is responsible for slot sharing. Slot sharing allows to run different
    + * tasks in the same slot and to realize co-location constraints.
    + *
    + * <p>The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such
that
    + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying
    + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the
    + * co-location constraint running in this slot.
    + *
    + * <p>The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and
    + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain
    + * a number of other {@link TaskSlot} and the latter class represents the leave nodes.
    + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
    + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot
    + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot}
    + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only
if
    + * the task slot does not yet contain another child with the same {@link AbstractID}
identifying
    + * the actual task or the co-location constraint.
    + *
    + * <p>Normal slot sharing is represented by a root {@link MultiTaskSlot} which
contains a set
    + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents
a different
    + * task.
    + *
    + * <p>Co-location constraints are modeled by adding a {@link MultiTaskSlot} to
the root node. The co-location
    + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add
a second co-located
    + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added
to co-located
    + * multi task slot.
    + */
    +public class SlotSharingManager {
    +
    +	private final SlotSharingGroupId slotSharingGroupId;
    +
    +	// needed to release allocated slots after a complete multi task slot hierarchy has
been released
    +	private final AllocatedSlotActions allocatedSlotActions;
    +
    +	// owner of the slots to which to return them when they are released from the outside
    +	private final SlotOwner slotOwner;
    +
    +	private final Map<SlotRequestId, TaskSlot> allTaskSlots;
    +
    +	// Root nodes which have not been completed because the allocated slot is still pending
    +	private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
    +
    +	// Root nodes which have been completed (the underlying allocated slot has been assigned)
    +	private final Map<TaskManagerLocation, Set<MultiTaskSlot>> resolvedRootSlots;
    +
    +	// Internal class to iterate over all resolved root slots
    +	private ResolvedRootSlotValues resolvedMultiTaskSlotValues;
    +
    +	public SlotSharingManager(
    +			SlotSharingGroupId slotSharingGroupId,
    +			AllocatedSlotActions allocatedSlotActions,
    +			SlotOwner slotOwner) {
    +		this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId);
    +		this.allocatedSlotActions = Preconditions.checkNotNull(allocatedSlotActions);
    +		this.slotOwner = Preconditions.checkNotNull(slotOwner);
    +
    +		allTaskSlots = new HashMap<>(16);
    +		unresolvedRootSlots = new HashMap<>(16);
    +		resolvedRootSlots = new HashMap<>(16);
    +
    +		resolvedMultiTaskSlotValues = null;
    +	}
    +
    +	public boolean isEmpty() {
    +		return allTaskSlots.isEmpty();
    +	}
    +
    +	public boolean contains(SlotRequestId slotRequestId) {
    +		return allTaskSlots.containsKey(slotRequestId);
    +	}
    +
    +	@Nullable
    +	public TaskSlot getTaskSlot(SlotRequestId slotRequestId) {
    +		return allTaskSlots.get(slotRequestId);
    +	}
    +
    +	/**
    +	 * Creates a new root slot with the given {@link SlotRequestId}, {@link SlotContext}
future and
    +	 * the {@link SlotRequestId} of the allocated slot.
    +	 *
    +	 * @param slotRequestId of the root slot
    +	 * @param slotContextFuture with which we create the root slot
    +	 * @param allocatedSlotRequestId slot request id of the underlying allocated slot which
can be used
    +	 *                               to cancel the pending slot request or release the allocated
slot
    +	 * @return New root slot
    +	 */
    +	public MultiTaskSlot createRootSlot(
    +			SlotRequestId slotRequestId,
    +			CompletableFuture<SlotContext> slotContextFuture,
    +			SlotRequestId allocatedSlotRequestId) {
    +		final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot(
    +			slotRequestId,
    +			slotContextFuture,
    +			allocatedSlotRequestId);
    +
    +		allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
    +		unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
    +
    +		// add the root node to the set of resolved root nodes once the SlotContext future
has been completed
    +		// and we know the slot's TaskManagerLocation
    +		slotContextFuture.whenComplete(
    +			(SlotContext slotContext, Throwable throwable) -> {
    +				if (slotContext != null) {
    +					final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
    +
    +					if (resolvedRootNode != null) {
    +						final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
    +							slotContext.getTaskManagerLocation(),
    +							taskManagerLocation -> new HashSet<>(4));
    +
    +						innerCollection.add(resolvedRootNode);
    +					}
    +				} else {
    +					rootMultiTaskSlot.release(throwable);
    +				}
    +			});
    +
    +		return rootMultiTaskSlot;
    +	}
    +
    +	/**
    +	 * Gets a resolved root slot which does not yet contain the given groupId. First the
given set of
    +	 * preferred locations is checked.
    +	 *
    +	 * @param groupId which the returned slot must not contain
    +	 * @param locationPreferences specifying which locations are preferred
    +	 * @return the resolved root slot and its locality wrt to the specified location preferences
    +	 * 		or null if there was no root slot which did not contain the given groupId
    +	 */
    +	@Nullable
    +	public MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, Collection<TaskManagerLocation>
locationPreferences) {
    +		Preconditions.checkNotNull(locationPreferences);
    +
    +		if (locationPreferences.isEmpty()) {
    +			return getResolvedRootSlotWithoutLocationPreferences(groupId);
    +		} else {
    +			return getResolvedRootSlotWithLocationPreferences(groupId, locationPreferences);
    +		}
    +	}
    +
    +	/**
    +	 * Gets a resolved root slot which does not yet contain the given groupId. The method
will try to
    +	 * find a slot of a TaskManager contained in the collection of preferred locations.
If there is no such slot
    +	 * with free capacities available, then the method will look for slots of TaskManager
which run on the same
    +	 * machine as the TaskManager in the collection of preferred locations. If there is
no such slot, then any slot
    +	 * with free capacities is returned. If there is no such slot, then null is returned.
    +	 *
    +	 * @param groupId which the returned slot must not contain
    +	 * @param locationPreferences specifying which locations are preferred
    +	 * @return the resolved root slot and its locality wrt to the specified location preferences
    +	 * 		or null if there was not root slot which did not contain the given groupId
    +	 */
    +	@Nullable
    +	private MultiTaskSlotLocality getResolvedRootSlotWithLocationPreferences(AbstractID
groupId, Collection<TaskManagerLocation> locationPreferences) {
    +		Preconditions.checkNotNull(groupId);
    +		Preconditions.checkNotNull(locationPreferences);
    +		final Set<String> hostnameSet = new HashSet<>();
    +
    +		for (TaskManagerLocation locationPreference : locationPreferences) {
    +			final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(locationPreference);
    +
    +			if (multiTaskSlots != null) {
    +				for (MultiTaskSlot multiTaskSlot : multiTaskSlots) {
    +					if (!multiTaskSlot.contains(groupId)) {
    +						return MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
    +					}
    +				}
    +
    +				hostnameSet.add(locationPreference.getHostname());
    +			}
    +		}
    +
    +		MultiTaskSlot nonLocalMultiTaskSlot = null;
    +
    +		for (Map.Entry<TaskManagerLocation, Set<MultiTaskSlot>> taskManagerLocationSetEntry
: resolvedRootSlots.entrySet()) {
    +			if (hostnameSet.contains(taskManagerLocationSetEntry.getKey().getHostname())) {
    +				for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) {
    +					if (!multiTaskSlot.contains(groupId)) {
    +						return MultiTaskSlotLocality.of(multiTaskSlot, Locality.HOST_LOCAL);
    +					}
    +				}
    +			} else if (nonLocalMultiTaskSlot == null) {
    +				for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) {
    +					if (!multiTaskSlot.contains(groupId)) {
    +						nonLocalMultiTaskSlot = multiTaskSlot;
    +					}
    +				}
    +			}
    +		}
    +
    +		if (nonLocalMultiTaskSlot != null) {
    +			return MultiTaskSlotLocality.of(nonLocalMultiTaskSlot, Locality.NON_LOCAL);
    +		} else {
    +			return null;
    +		}
    +	}
    +
    +	/**
    +	 * Gets a resolved slot which does not yet contain the given groupId without any location
    +	 * preferences.
    +	 *
    +	 * @param groupId which the returned slot must not contain
    +	 * @return the resolved slot or null if there was no root slot with free capacities
    +	 */
    +	@Nullable
    +	private MultiTaskSlotLocality getResolvedRootSlotWithoutLocationPreferences(AbstractID
groupId) {
    +		Preconditions.checkNotNull(groupId);
    +
    +		for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
    +			for (MultiTaskSlot multiTaskSlot : multiTaskSlots) {
    +				if (!multiTaskSlot.contains(groupId)) {
    +					return MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNCONSTRAINED);
    +				}
    +			}
    +		}
    +
    +		return null;
    +	}
    +
    +	/**
    +	 * Gets an unresolved slot which does not yet contain the given groupId. An unresolved
    +	 * slot is a slot whose underlying allocated slot has not been allocated yet.
    +	 *
    +	 * @param groupId which the returned slot must not contain
    +	 * @return the unresolved slot or null if there was no root slot with free capacities
    +	 */
    +	@Nullable
    +	public MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
    +		for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
    +			if (!multiTaskSlot.contains(groupId)) {
    +				return multiTaskSlot;
    +			}
    +		}
    +
    +		return null;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	// Inner classes: TaskSlot hierarchy and helper classes
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Helper class which contains a {@link MultiTaskSlot} and its {@link Locality}.
    +	 */
    +	public static final class MultiTaskSlotLocality {
    +		private final MultiTaskSlot multiTaskSlot;
    +
    +		private final Locality locality;
    +
    +		public MultiTaskSlotLocality(MultiTaskSlot multiTaskSlot, Locality locality) {
    +			this.multiTaskSlot = Preconditions.checkNotNull(multiTaskSlot);
    +			this.locality = Preconditions.checkNotNull(locality);
    +		}
    +
    +		public MultiTaskSlot getMultiTaskSlot() {
    +			return multiTaskSlot;
    +		}
    +
    +		public Locality getLocality() {
    +			return locality;
    +		}
    +
    +		public static MultiTaskSlotLocality of(MultiTaskSlot multiTaskSlot, Locality locality)
{
    +			return new MultiTaskSlotLocality(multiTaskSlot, locality);
    +		}
    +	}
    +
    +	/**
    +	 * Base class for all task slots.
    +	 */
    +	public abstract static class TaskSlot {
    +		// every TaskSlot has an associated slot request id
    +		private final SlotRequestId slotRequestId;
    +
    +		// all task slots except for the root slots have a group id assigned
    +		@Nullable
    +		private final AbstractID groupId;
    +
    +		protected TaskSlot(SlotRequestId slotRequestId, @Nullable AbstractID groupId) {
    +			this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
    +			this.groupId = groupId;
    +		}
    +
    +		public SlotRequestId getSlotRequestId() {
    +			return slotRequestId;
    +		}
    +
    +		@Nullable
    +		public AbstractID getGroupId() {
    +			return groupId;
    +		}
    +
    +		/**
    +		 * Check whether the task slot contains the given groupId.
    +		 *
    +		 * @param groupId which to check whether it is contained
    +		 * @return true if the task slot contains the given groupId, otherwise false
    +		 */
    +		public boolean contains(AbstractID groupId) {
    +			return Objects.equals(this.groupId, groupId);
    +		}
    +
    +		/**
    +		 * Release the task slot.
    +		 *
    +		 * @param cause for the release
    +		 * @return true if the slot could be released, otherwise false
    +		 */
    +		public abstract boolean release(Throwable cause);
    +	}
    +
    +	/**
    +	 * {@link TaskSlot} implementation which can have multiple other task slots assigned
as children.
    +	 */
    +	public final class MultiTaskSlot extends TaskSlot implements AllocatedSlot.Payload {
    +
    +		private final Map<AbstractID, TaskSlot> children;
    +
    +		// the root node has its parent set to null
    +		@Nullable
    +		private final MultiTaskSlot parent;
    +
    +		// underlying allocated slot
    +		private final CompletableFuture<SlotContext> slotContextFuture;
    +
    +		// slot request id of the allocated slot
    +		@Nullable
    +		private final SlotRequestId allocatedSlotRequestId;
    +
    +		// true if we are currently releasing our children
    +		private boolean releasingChildren;
    +
    +		private MultiTaskSlot(
    +				SlotRequestId slotRequestId,
    +				AbstractID groupId,
    +				MultiTaskSlot parent) {
    +			this(
    +				slotRequestId,
    +				groupId,
    +				Preconditions.checkNotNull(parent),
    +				parent.getSlotContextFuture(),
    +				null);
    +		}
    +
    +		private MultiTaskSlot(
    +				SlotRequestId slotRequestId,
    +				CompletableFuture<SlotContext> slotContextFuture,
    +				SlotRequestId allocatedSlotRequestId) {
    +			this(
    +				slotRequestId,
    +				null,
    +				null,
    +				slotContextFuture,
    +				allocatedSlotRequestId);
    +		}
    +
    +		private MultiTaskSlot(
    +				SlotRequestId slotRequestId,
    +				@Nullable AbstractID groupId,
    +				MultiTaskSlot parent,
    +				CompletableFuture<SlotContext> slotContextFuture,
    +				SlotRequestId allocatedSlotRequestId) {
    +			super(slotRequestId, groupId);
    +
    +			this.parent = parent;
    +			this.slotContextFuture = Preconditions.checkNotNull(slotContextFuture);
    +			this.allocatedSlotRequestId = allocatedSlotRequestId;
    +
    +			this.children = new HashMap<>(16);
    +			this.releasingChildren = false;
    +
    +			slotContextFuture.whenComplete(
    +				(SlotContext ignored, Throwable throwable) -> {
    +					if (throwable != null) {
    +						release(throwable);
    +					}
    +				});
    +		}
    +
    +		public CompletableFuture<SlotContext> getSlotContextFuture() {
    +			return slotContextFuture;
    +		}
    +
    +		/**
    +		 * Allocates a {@link MultiTaskSlot} and registers it under the given groupId at
    +		 * this {@link MultiTaskSlot}.
    +		 *
    +		 * @param slotRequestId of the new multi task slot
    +		 * @param groupId under which the new multi task slot is registered
    +		 * @return the newly allocated {@link MultiTaskSlot}
    +		 */
    +		MultiTaskSlot allocateMultiTaskSlot(SlotRequestId slotRequestId, AbstractID groupId)
{
    +			Preconditions.checkState(!super.contains(groupId));
    +
    +			final MultiTaskSlot inner = new MultiTaskSlot(
    +				slotRequestId,
    +				groupId,
    +				this);
    +
    +			children.put(groupId, inner);
    +
    +			// register the newly allocated slot also at the SlotSharingManager
    +			allTaskSlots.put(slotRequestId, inner);
    +
    +			return inner;
    +		}
    +
    +		/**
    +		 * Allocates a {@link SingleTaskSlot} and registeres it under the given groupId at
    +		 * this {@link MultiTaskSlot}.
    +		 *
    +		 * @param slotRequestId of the new single task slot
    +		 * @param groupId under which the new single task slot is registered
    +		 * @param locality of the allocation
    +		 * @return the newly allocated {@link SingleTaskSlot}
    +		 */
    +		SingleTaskSlot allocateSingleTaskSlot(
    +				SlotRequestId slotRequestId,
    +				AbstractID groupId,
    +				Locality locality) {
    +			Preconditions.checkState(!super.contains(groupId));
    +
    +			final SingleTaskSlot leave = new SingleTaskSlot(
    +				slotRequestId,
    +				groupId,
    +				this,
    +				locality);
    +
    +			children.put(groupId, leave);
    +
    +			// register the newly allocated slot also at the SlotSharingManager
    +			allTaskSlots.put(slotRequestId, leave);
    +
    +			return leave;
    +		}
    +
    +		/**
    +		 * Checks whether this slot or any of its children contains the given groupId.
    +		 *
    +		 * @param groupId which to check whether it is contained
    +		 * @return true if this or any of its children contains the given groupId, otherwise
false
    +		 */
    +		@Override
    +		public boolean contains(AbstractID groupId) {
    +			if (super.contains(groupId)) {
    +				return true;
    +			} else {
    +				for (TaskSlot taskSlot : children.values()) {
    +					if (taskSlot.contains(groupId)) {
    +						return true;
    +					}
    +				}
    +
    +				return false;
    +			}
    +		}
    +
    +		@Override
    +		public boolean release(Throwable cause) {
    +			releasingChildren = true;
    +
    +			// first release all children and remove them if they could be released immediately
    +			children.values().removeIf(node -> {
    +				boolean release = node.release(cause);
    +
    +				if (release) {
    +					allTaskSlots.remove(node.slotRequestId);
    +				}
    +
    +				return release;
    +			});
    +
    +			releasingChildren = false;
    +
    +			if (children.isEmpty()) {
    +				if (parent != null) {
    +					// we remove ourselves from our parent if we no longer have children
    +					parent.releaseChild(getGroupId());
    +				} else {
    +					// we are the root node --> remove the root node from the list of task slots
    +					allTaskSlots.remove(getSlotRequestId());
    +
    +					if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally())
{
    +						// the root node should still be unresolved
    +						unresolvedRootSlots.remove(getSlotRequestId());
    +					} else {
    +						// the root node should be resolved --> we can access the slot context
    +						final SlotContext slotContext = slotContextFuture.getNow(null);
    +
    +						if (slotContext != null) {
    +							final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
    +
    +							if (multiTaskSlots != null) {
    +								multiTaskSlots.remove(this);
    +
    +								if (multiTaskSlots.isEmpty()) {
    +									resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
    +								}
    +							}
    +						}
    +					}
    +
    +					// release the underlying allocated slot
    +					allocatedSlotActions.releaseSlot(allocatedSlotRequestId, null, cause);
    +				}
    +
    +				return true;
    +			} else {
    +				return false;
    +			}
    +		}
    +
    +		/**
    +		 * Releases the child with the given childGroupId.
    +		 *
    +		 * @param childGroupId identifying the child to release
    +		 */
    +		private void releaseChild(AbstractID childGroupId) {
    +			if (!releasingChildren) {
    +				TaskSlot child = children.remove(childGroupId);
    +
    +				if (child != null) {
    +					allTaskSlots.remove(child.getSlotRequestId());
    +				}
    +
    +				if (children.isEmpty()) {
    +					release(new FlinkException("Release multi task slot because all children have been
released."));
    +				}
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * {@link TaskSlot} implementation which harbours a {@link LogicalSlot}. The {@link
SingleTaskSlot}
    +	 * cannot have any children assigned.
    +	 */
    +	public final class SingleTaskSlot extends TaskSlot {
    +		private final MultiTaskSlot parent;
    +
    +		// future containing a LogicalSlot which is completed once the underlying SlotContext
future is completed
    +		private final CompletableFuture<LogicalSlot> logicalSlotFuture;
    +
    +		private SingleTaskSlot(
    +				SlotRequestId slotRequestId,
    +				AbstractID groupId,
    +				MultiTaskSlot parent,
    +				Locality locality) {
    +			super(slotRequestId, groupId);
    +
    +			this.parent = Preconditions.checkNotNull(parent);
    +
    +			Preconditions.checkNotNull(locality);
    +			logicalSlotFuture = parent.getSlotContextFuture()
    +				.thenApply(
    +					(SlotContext slotContext) ->
    +						new SingleLogicalSlot(
    +							slotRequestId,
    +							slotContext,
    +							slotSharingGroupId,
    +							locality,
    +							slotOwner));
    +		}
    +
    +		public CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
    +			return logicalSlotFuture;
    +		}
    +
    +		@Override
    +		public boolean release(Throwable cause) {
    +			logicalSlotFuture.completeExceptionally(cause);
    +
    +			boolean pendingLogicalSlotRelease = false;
    +
    +			if (logicalSlotFuture.isDone() && !logicalSlotFuture.isCompletedExceptionally())
{
    +				// we have a single task slot which we first have to release
    +				final LogicalSlot logicalSlot = logicalSlotFuture.getNow(null);
    +
    +				if (logicalSlot != null && logicalSlot.isAlive()) {
    +					pendingLogicalSlotRelease = logicalSlot.releaseSlot(cause).isDone();
    +				}
    +			}
    +
    +			if (!pendingLogicalSlotRelease) {
    +				parent.releaseChild(getGroupId());
    +			}
    +
    +			return !pendingLogicalSlotRelease;
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	// Methods and classes for testing
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a collection of all resolved root slots.
    +	 *
    +	 * @return Collection of all resolved root slots
    +	 */
    +	@VisibleForTesting
    +	public Collection<MultiTaskSlot> getResolvedRootSlots() {
    +		ResolvedRootSlotValues vs = resolvedMultiTaskSlotValues;
    +
    +		if (vs == null ){
    +			vs = new ResolvedRootSlotValues();
    +			resolvedMultiTaskSlotValues = vs;
    +		}
    +
    +		return vs;
    +	}
    +
    +	@VisibleForTesting
    +	Collection<MultiTaskSlot> getUnresolvedRootSlots() {
    +		return unresolvedRootSlots.values();
    +	}
    +
    +	/**
    +	 * Collection of all resolved {@link MultiTaskSlot} root slots.
    +	 */
    +	private final class ResolvedRootSlotValues extends AbstractCollection<MultiTaskSlot>
{
    +
    +		@Override
    +		public Iterator<MultiTaskSlot> iterator() {
    +			return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
    +		}
    +
    +		@Override
    +		public int size() {
    +			int numberResolvedMultiTaskSlots = 0;
    +
    +			for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
    +				numberResolvedMultiTaskSlots += multiTaskSlots.size();
    +			}
    +
    +			return numberResolvedMultiTaskSlots;
    +		}
    +	}
    +
    +	/**
    +	 * Iterator over all resolved {@link MultiTaskSlot} root slots.
    +	 */
    +	private static final class ResolvedRootSlotIterator implements Iterator<MultiTaskSlot>
{
    +		private final Iterator<Set<MultiTaskSlot>> baseIterator;
    +		private Iterator<MultiTaskSlot> currentIterator;
    +
    +		private ResolvedRootSlotIterator(Iterator<Set<MultiTaskSlot>> baseIterator)
{
    +			this.baseIterator = Preconditions.checkNotNull(baseIterator);
    +
    +			if (baseIterator.hasNext()) {
    +				currentIterator = baseIterator.next().iterator();
    +			} else {
    +				currentIterator = Collections.emptyIterator();
    +			}
    +		}
    +
    +		@Override
    +		public boolean hasNext() {
    +			progressToNextElement();
    +
    +			return currentIterator.hasNext();
    +		}
    +
    +		@Override
    +		public MultiTaskSlot next() {
    +			progressToNextElement();
    +
    +			return currentIterator.next();
    +		}
    +
    +		private void progressToNextElement() {
    +			while(baseIterator.hasNext() && ! currentIterator.hasNext()) {
    --- End diff --
    
    Good catch. Will correct it.


> Add support for scheduling with slot sharing
> --------------------------------------------
>
>                 Key: FLINK-7956
>                 URL: https://issues.apache.org/jira/browse/FLINK-7956
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Scheduler
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>              Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add support for
scheduling with slot sharing to the {{SlotPool}}. This will also allow us to run all the IT
cases based on the {{AbstractTestBase}} on the Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message