Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 90FCD200D66 for ; Thu, 14 Dec 2017 10:14:08 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8F78E160C01; Thu, 14 Dec 2017 09:14:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3BA53160C25 for ; Thu, 14 Dec 2017 10:14:07 +0100 (CET) Received: (qmail 46341 invoked by uid 500); 14 Dec 2017 09:14:06 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 46328 invoked by uid 99); 14 Dec 2017 09:14:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Dec 2017 09:14:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id DDCB11809C4 for ; Thu, 14 Dec 2017 09:14:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id oGPwEmyl0bvd for ; Thu, 14 Dec 2017 09:14:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 0EC175F56A for ; Thu, 14 Dec 2017 09:14:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 74C79E0F55 for ; Thu, 14 Dec 2017 09:14:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 214B6212F5 for ; Thu, 14 Dec 2017 09:14:00 +0000 (UTC) Date: Thu, 14 Dec 2017 09:14:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 14 Dec 2017 09:14:08 -0000 [ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290591#comment-16290591 ] ASF GitHub Bot commented on FLINK-7956: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r156887866 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + *

The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + *

The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + *

Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + *

Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final Map allTaskSlots; + + // Root nodes which have not been completed because the allocated slot is still pending + private final Map unresolvedRootSlots; + + // Root nodes which have been completed (the underlying allocated slot has been assigned) + private final Map> resolvedRootSlots; + + // Internal class to iterate over all resolved root slots + private ResolvedRootSlotValues resolvedMultiTaskSlotValues; + + public SlotSharingManager( + SlotSharingGroupId slotSharingGroupId, + AllocatedSlotActions allocatedSlotActions, + SlotOwner slotOwner) { + this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId); + this.allocatedSlotActions = Preconditions.checkNotNull(allocatedSlotActions); + this.slotOwner = Preconditions.checkNotNull(slotOwner); + + allTaskSlots = new HashMap<>(16); + unresolvedRootSlots = new HashMap<>(16); + resolvedRootSlots = new HashMap<>(16); + + resolvedMultiTaskSlotValues = null; + } + + public boolean isEmpty() { + return allTaskSlots.isEmpty(); + } + + public boolean contains(SlotRequestId slotRequestId) { + return allTaskSlots.containsKey(slotRequestId); + } + + @Nullable + public TaskSlot getTaskSlot(SlotRequestId slotRequestId) { + return allTaskSlots.get(slotRequestId); + } + + /** + * Creates a new root slot with the given {@link SlotRequestId}, {@link SlotContext} future and + * the {@link SlotRequestId} of the allocated slot. + * + * @param slotRequestId of the root slot + * @param slotContextFuture with which we create the root slot + * @param allocatedSlotRequestId slot request id of the underlying allocated slot which can be used + * to cancel the pending slot request or release the allocated slot + * @return New root slot + */ + public MultiTaskSlot createRootSlot( + SlotRequestId slotRequestId, + CompletableFuture slotContextFuture, + SlotRequestId allocatedSlotRequestId) { + final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot( + slotRequestId, + slotContextFuture, + allocatedSlotRequestId); + + allTaskSlots.put(slotRequestId, rootMultiTaskSlot); + unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot); + + // add the root node to the set of resolved root nodes once the SlotContext future has been completed + // and we know the slot's TaskManagerLocation + slotContextFuture.whenComplete( + (SlotContext slotContext, Throwable throwable) -> { + if (slotContext != null) { + final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId); + + if (resolvedRootNode != null) { + final Set innerCollection = resolvedRootSlots.computeIfAbsent( + slotContext.getTaskManagerLocation(), + taskManagerLocation -> new HashSet<>(4)); + + innerCollection.add(resolvedRootNode); + } + } else { + rootMultiTaskSlot.release(throwable); + } + }); + + return rootMultiTaskSlot; + } + + /** + * Gets a resolved root slot which does not yet contain the given groupId. First the given set of + * preferred locations is checked. + * + * @param groupId which the returned slot must not contain + * @param locationPreferences specifying which locations are preferred + * @return the resolved root slot and its locality wrt to the specified location preferences + * or null if there was no root slot which did not contain the given groupId + */ + @Nullable + public MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, Collection locationPreferences) { + Preconditions.checkNotNull(locationPreferences); + + if (locationPreferences.isEmpty()) { + return getResolvedRootSlotWithoutLocationPreferences(groupId); + } else { + return getResolvedRootSlotWithLocationPreferences(groupId, locationPreferences); + } + } + + /** + * Gets a resolved root slot which does not yet contain the given groupId. The method will try to + * find a slot of a TaskManager contained in the collection of preferred locations. If there is no such slot + * with free capacities available, then the method will look for slots of TaskManager which run on the same + * machine as the TaskManager in the collection of preferred locations. If there is no such slot, then any slot + * with free capacities is returned. If there is no such slot, then null is returned. + * + * @param groupId which the returned slot must not contain + * @param locationPreferences specifying which locations are preferred + * @return the resolved root slot and its locality wrt to the specified location preferences + * or null if there was not root slot which did not contain the given groupId + */ + @Nullable + private MultiTaskSlotLocality getResolvedRootSlotWithLocationPreferences(AbstractID groupId, Collection locationPreferences) { + Preconditions.checkNotNull(groupId); + Preconditions.checkNotNull(locationPreferences); + final Set hostnameSet = new HashSet<>(); + + for (TaskManagerLocation locationPreference : locationPreferences) { + final Set multiTaskSlots = resolvedRootSlots.get(locationPreference); + + if (multiTaskSlots != null) { + for (MultiTaskSlot multiTaskSlot : multiTaskSlots) { + if (!multiTaskSlot.contains(groupId)) { + return MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL); + } + } + + hostnameSet.add(locationPreference.getHostname()); + } + } + + MultiTaskSlot nonLocalMultiTaskSlot = null; + + for (Map.Entry> taskManagerLocationSetEntry : resolvedRootSlots.entrySet()) { + if (hostnameSet.contains(taskManagerLocationSetEntry.getKey().getHostname())) { + for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) { + if (!multiTaskSlot.contains(groupId)) { + return MultiTaskSlotLocality.of(multiTaskSlot, Locality.HOST_LOCAL); + } + } + } else if (nonLocalMultiTaskSlot == null) { + for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) { + if (!multiTaskSlot.contains(groupId)) { + nonLocalMultiTaskSlot = multiTaskSlot; + } + } + } + } + + if (nonLocalMultiTaskSlot != null) { + return MultiTaskSlotLocality.of(nonLocalMultiTaskSlot, Locality.NON_LOCAL); + } else { + return null; + } + } + + /** + * Gets a resolved slot which does not yet contain the given groupId without any location + * preferences. + * + * @param groupId which the returned slot must not contain + * @return the resolved slot or null if there was no root slot with free capacities + */ + @Nullable + private MultiTaskSlotLocality getResolvedRootSlotWithoutLocationPreferences(AbstractID groupId) { + Preconditions.checkNotNull(groupId); + + for (Set multiTaskSlots : resolvedRootSlots.values()) { + for (MultiTaskSlot multiTaskSlot : multiTaskSlots) { + if (!multiTaskSlot.contains(groupId)) { + return MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNCONSTRAINED); + } + } + } + + return null; + } + + /** + * Gets an unresolved slot which does not yet contain the given groupId. An unresolved + * slot is a slot whose underlying allocated slot has not been allocated yet. + * + * @param groupId which the returned slot must not contain + * @return the unresolved slot or null if there was no root slot with free capacities + */ + @Nullable + public MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) { + for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) { + if (!multiTaskSlot.contains(groupId)) { + return multiTaskSlot; + } + } + + return null; + } + + // ------------------------------------------------------------------------ + // Inner classes: TaskSlot hierarchy and helper classes + // ------------------------------------------------------------------------ + + /** + * Helper class which contains a {@link MultiTaskSlot} and its {@link Locality}. + */ + public static final class MultiTaskSlotLocality { + private final MultiTaskSlot multiTaskSlot; + + private final Locality locality; + + public MultiTaskSlotLocality(MultiTaskSlot multiTaskSlot, Locality locality) { + this.multiTaskSlot = Preconditions.checkNotNull(multiTaskSlot); + this.locality = Preconditions.checkNotNull(locality); + } + + public MultiTaskSlot getMultiTaskSlot() { + return multiTaskSlot; + } + + public Locality getLocality() { + return locality; + } + + public static MultiTaskSlotLocality of(MultiTaskSlot multiTaskSlot, Locality locality) { + return new MultiTaskSlotLocality(multiTaskSlot, locality); + } + } + + /** + * Base class for all task slots. + */ + public abstract static class TaskSlot { + // every TaskSlot has an associated slot request id + private final SlotRequestId slotRequestId; + + // all task slots except for the root slots have a group id assigned + @Nullable + private final AbstractID groupId; + + protected TaskSlot(SlotRequestId slotRequestId, @Nullable AbstractID groupId) { + this.slotRequestId = Preconditions.checkNotNull(slotRequestId); + this.groupId = groupId; + } + + public SlotRequestId getSlotRequestId() { + return slotRequestId; + } + + @Nullable + public AbstractID getGroupId() { + return groupId; + } + + /** + * Check whether the task slot contains the given groupId. + * + * @param groupId which to check whether it is contained + * @return true if the task slot contains the given groupId, otherwise false + */ + public boolean contains(AbstractID groupId) { + return Objects.equals(this.groupId, groupId); + } + + /** + * Release the task slot. + * + * @param cause for the release + * @return true if the slot could be released, otherwise false + */ + public abstract boolean release(Throwable cause); + } + + /** + * {@link TaskSlot} implementation which can have multiple other task slots assigned as children. + */ + public final class MultiTaskSlot extends TaskSlot implements AllocatedSlot.Payload { + + private final Map children; + + // the root node has its parent set to null + @Nullable + private final MultiTaskSlot parent; + + // underlying allocated slot + private final CompletableFuture slotContextFuture; + + // slot request id of the allocated slot + @Nullable + private final SlotRequestId allocatedSlotRequestId; + + // true if we are currently releasing our children + private boolean releasingChildren; + + private MultiTaskSlot( + SlotRequestId slotRequestId, + AbstractID groupId, + MultiTaskSlot parent) { + this( + slotRequestId, + groupId, + Preconditions.checkNotNull(parent), + parent.getSlotContextFuture(), + null); + } + + private MultiTaskSlot( + SlotRequestId slotRequestId, + CompletableFuture slotContextFuture, + SlotRequestId allocatedSlotRequestId) { + this( + slotRequestId, + null, + null, + slotContextFuture, + allocatedSlotRequestId); + } + + private MultiTaskSlot( + SlotRequestId slotRequestId, + @Nullable AbstractID groupId, + MultiTaskSlot parent, + CompletableFuture slotContextFuture, + SlotRequestId allocatedSlotRequestId) { + super(slotRequestId, groupId); + + this.parent = parent; + this.slotContextFuture = Preconditions.checkNotNull(slotContextFuture); + this.allocatedSlotRequestId = allocatedSlotRequestId; + + this.children = new HashMap<>(16); + this.releasingChildren = false; + + slotContextFuture.whenComplete( + (SlotContext ignored, Throwable throwable) -> { + if (throwable != null) { + release(throwable); + } + }); + } + + public CompletableFuture getSlotContextFuture() { + return slotContextFuture; + } + + /** + * Allocates a {@link MultiTaskSlot} and registers it under the given groupId at + * this {@link MultiTaskSlot}. + * + * @param slotRequestId of the new multi task slot + * @param groupId under which the new multi task slot is registered + * @return the newly allocated {@link MultiTaskSlot} + */ + MultiTaskSlot allocateMultiTaskSlot(SlotRequestId slotRequestId, AbstractID groupId) { + Preconditions.checkState(!super.contains(groupId)); + + final MultiTaskSlot inner = new MultiTaskSlot( + slotRequestId, + groupId, + this); + + children.put(groupId, inner); + + // register the newly allocated slot also at the SlotSharingManager + allTaskSlots.put(slotRequestId, inner); + + return inner; + } + + /** + * Allocates a {@link SingleTaskSlot} and registeres it under the given groupId at + * this {@link MultiTaskSlot}. + * + * @param slotRequestId of the new single task slot + * @param groupId under which the new single task slot is registered + * @param locality of the allocation + * @return the newly allocated {@link SingleTaskSlot} + */ + SingleTaskSlot allocateSingleTaskSlot( + SlotRequestId slotRequestId, + AbstractID groupId, + Locality locality) { + Preconditions.checkState(!super.contains(groupId)); + + final SingleTaskSlot leave = new SingleTaskSlot( + slotRequestId, + groupId, + this, + locality); + + children.put(groupId, leave); + + // register the newly allocated slot also at the SlotSharingManager + allTaskSlots.put(slotRequestId, leave); + + return leave; + } + + /** + * Checks whether this slot or any of its children contains the given groupId. + * + * @param groupId which to check whether it is contained + * @return true if this or any of its children contains the given groupId, otherwise false + */ + @Override + public boolean contains(AbstractID groupId) { + if (super.contains(groupId)) { + return true; + } else { + for (TaskSlot taskSlot : children.values()) { + if (taskSlot.contains(groupId)) { + return true; + } + } + + return false; + } + } + + @Override + public boolean release(Throwable cause) { + releasingChildren = true; + + // first release all children and remove them if they could be released immediately + children.values().removeIf(node -> { + boolean release = node.release(cause); + + if (release) { + allTaskSlots.remove(node.slotRequestId); + } + + return release; + }); + + releasingChildren = false; + + if (children.isEmpty()) { + if (parent != null) { + // we remove ourselves from our parent if we no longer have children + parent.releaseChild(getGroupId()); + } else { + // we are the root node --> remove the root node from the list of task slots + allTaskSlots.remove(getSlotRequestId()); + + if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally()) { + // the root node should still be unresolved + unresolvedRootSlots.remove(getSlotRequestId()); + } else { + // the root node should be resolved --> we can access the slot context + final SlotContext slotContext = slotContextFuture.getNow(null); + + if (slotContext != null) { + final Set multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation()); + + if (multiTaskSlots != null) { + multiTaskSlots.remove(this); + + if (multiTaskSlots.isEmpty()) { + resolvedRootSlots.remove(slotContext.getTaskManagerLocation()); + } + } + } + } + + // release the underlying allocated slot + allocatedSlotActions.releaseSlot(allocatedSlotRequestId, null, cause); + } + + return true; + } else { + return false; + } + } + + /** + * Releases the child with the given childGroupId. + * + * @param childGroupId identifying the child to release + */ + private void releaseChild(AbstractID childGroupId) { + if (!releasingChildren) { + TaskSlot child = children.remove(childGroupId); + + if (child != null) { + allTaskSlots.remove(child.getSlotRequestId()); + } + + if (children.isEmpty()) { + release(new FlinkException("Release multi task slot because all children have been released.")); + } + } + } + } + + /** + * {@link TaskSlot} implementation which harbours a {@link LogicalSlot}. The {@link SingleTaskSlot} + * cannot have any children assigned. + */ + public final class SingleTaskSlot extends TaskSlot { + private final MultiTaskSlot parent; + + // future containing a LogicalSlot which is completed once the underlying SlotContext future is completed + private final CompletableFuture logicalSlotFuture; + + private SingleTaskSlot( + SlotRequestId slotRequestId, + AbstractID groupId, + MultiTaskSlot parent, + Locality locality) { + super(slotRequestId, groupId); + + this.parent = Preconditions.checkNotNull(parent); + + Preconditions.checkNotNull(locality); + logicalSlotFuture = parent.getSlotContextFuture() + .thenApply( + (SlotContext slotContext) -> + new SingleLogicalSlot( + slotRequestId, + slotContext, + slotSharingGroupId, + locality, + slotOwner)); + } + + public CompletableFuture getLogicalSlotFuture() { + return logicalSlotFuture; + } + + @Override + public boolean release(Throwable cause) { + logicalSlotFuture.completeExceptionally(cause); + + boolean pendingLogicalSlotRelease = false; + + if (logicalSlotFuture.isDone() && !logicalSlotFuture.isCompletedExceptionally()) { + // we have a single task slot which we first have to release + final LogicalSlot logicalSlot = logicalSlotFuture.getNow(null); + + if (logicalSlot != null && logicalSlot.isAlive()) { + pendingLogicalSlotRelease = logicalSlot.releaseSlot(cause).isDone(); + } + } + + if (!pendingLogicalSlotRelease) { + parent.releaseChild(getGroupId()); + } + + return !pendingLogicalSlotRelease; + } + } + + // ------------------------------------------------------------------------ + // Methods and classes for testing + // ------------------------------------------------------------------------ + + /** + * Returns a collection of all resolved root slots. + * + * @return Collection of all resolved root slots + */ + @VisibleForTesting + public Collection getResolvedRootSlots() { + ResolvedRootSlotValues vs = resolvedMultiTaskSlotValues; + + if (vs == null ){ + vs = new ResolvedRootSlotValues(); + resolvedMultiTaskSlotValues = vs; + } + + return vs; + } + + @VisibleForTesting + Collection getUnresolvedRootSlots() { + return unresolvedRootSlots.values(); + } + + /** + * Collection of all resolved {@link MultiTaskSlot} root slots. + */ + private final class ResolvedRootSlotValues extends AbstractCollection { + + @Override + public Iterator iterator() { + return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator()); + } + + @Override + public int size() { + int numberResolvedMultiTaskSlots = 0; + + for (Set multiTaskSlots : resolvedRootSlots.values()) { + numberResolvedMultiTaskSlots += multiTaskSlots.size(); + } + + return numberResolvedMultiTaskSlots; + } + } + + /** + * Iterator over all resolved {@link MultiTaskSlot} root slots. + */ + private static final class ResolvedRootSlotIterator implements Iterator { + private final Iterator> baseIterator; + private Iterator currentIterator; + + private ResolvedRootSlotIterator(Iterator> baseIterator) { + this.baseIterator = Preconditions.checkNotNull(baseIterator); + + if (baseIterator.hasNext()) { + currentIterator = baseIterator.next().iterator(); + } else { + currentIterator = Collections.emptyIterator(); + } + } + + @Override + public boolean hasNext() { + progressToNextElement(); + + return currentIterator.hasNext(); + } + + @Override + public MultiTaskSlot next() { + progressToNextElement(); + + return currentIterator.next(); + } + + private void progressToNextElement() { + while(baseIterator.hasNext() && ! currentIterator.hasNext()) { --- End diff -- Good catch. Will correct it. > Add support for scheduling with slot sharing > -------------------------------------------- > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler > Affects Versions: 1.4.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Labels: flip-6 > > In order to reach feature equivalence with the old code base, we should add support for scheduling with slot sharing to the {{SlotPool}}. This will also allow us to run all the IT cases based on the {{AbstractTestBase}} on the Flip-6 {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)