Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DFA9C200ACC for ; Mon, 2 May 2016 18:05:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DE6481609B0; Mon, 2 May 2016 18:05:04 +0200 (CEST) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B33131609A6 for ; Mon, 2 May 2016 18:05:02 +0200 (CEST) Received: (qmail 83972 invoked by uid 500); 2 May 2016 16:05:01 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 83963 invoked by uid 99); 2 May 2016 16:05:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 May 2016 16:05:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 875FEDFB79; Mon, 2 May 2016 16:05:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: maxim@apache.org To: commits@aurora.apache.org Message-Id: <584e380d554e4909944f71222c84f885@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: aurora git commit: Generalizing port resource management. Date: Mon, 2 May 2016 16:05:01 +0000 (UTC) archived-at: Mon, 02 May 2016 16:05:05 -0000 Repository: aurora Updated Branches: refs/heads/master 450d88156 -> d8d1c8d68 Generalizing port resource management. Reviewed at https://reviews.apache.org/r/46810/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/d8d1c8d6 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/d8d1c8d6 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/d8d1c8d6 Branch: refs/heads/master Commit: d8d1c8d6889b39c5da0ca95496ef8bda7827fb4a Parents: 450d881 Author: Maxim Khutornenko Authored: Mon May 2 09:04:54 2016 -0700 Committer: Maxim Khutornenko Committed: Mon May 2 09:04:54 2016 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationManager.java | 19 +- .../scheduler/filter/SchedulingFilter.java | 4 - .../scheduler/resources/ResourceManager.java | 81 +++++++ .../scheduler/resources/ResourceMapper.java | 84 ++++++++ .../scheduler/resources/ResourceSlot.java | 3 +- .../scheduler/resources/ResourceType.java | 43 +++- .../aurora/scheduler/resources/Resources.java | 31 --- .../aurora/scheduler/state/StateManager.java | 7 +- .../scheduler/state/StateManagerImpl.java | 13 +- .../aurora/scheduler/state/TaskAssigner.java | 28 +-- .../configuration/ConfigurationManagerTest.java | 11 + .../filter/SchedulingFilterImplTest.java | 25 +-- .../mesos/MesosTaskFactoryImplTest.java | 7 +- .../scheduler/resources/AcceptedOfferTest.java | 213 ++++++------------- .../scheduler/resources/PortMapperTest.java | 62 ++++++ .../resources/ResourceManagerTest.java | 88 ++++++++ .../scheduler/resources/ResourceTestUtil.java | 85 ++++++++ .../scheduler/resources/ResourcesTest.java | 144 ++----------- .../scheduler/state/StateManagerImplTest.java | 23 +- .../scheduler/state/TaskAssignerImplTest.java | 89 ++++---- 20 files changed, 645 insertions(+), 415 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java index 9a15a4b..e1ce638 100644 --- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java +++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java @@ -38,10 +38,12 @@ import org.apache.aurora.gen.TaskConstraint; import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.UserProvidedStrings; +import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.entities.IConstraint; import org.apache.aurora.scheduler.storage.entities.IContainer; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; +import org.apache.aurora.scheduler.storage.entities.IResource; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.storage.entities.ITaskConstraint; import org.apache.aurora.scheduler.storage.entities.IValueConstraint; @@ -49,6 +51,8 @@ import org.apache.aurora.scheduler.storage.log.ThriftBackfill; import static java.util.Objects.requireNonNull; +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; + /** * Manages translation from a string-mapped configuration to a concrete configuration type, and * defaults for optional values. @@ -224,8 +228,6 @@ public class ConfigurationManager { builder.setRequestedPorts(ImmutableSet.of()); } - maybeFillLinks(builder); - if (config.isSetTier() && !UserProvidedStrings.isGoodIdentifier(config.getTier())) { throw new TaskDescriptionException("Tier contains illegal characters: " + config.getTier()); } @@ -327,6 +329,8 @@ public class ConfigurationManager { throw new TaskDescriptionException("Multiple resource values are not supported for " + types); } + maybeFillLinks(builder); + return ITaskConfig.build(builder); } @@ -343,11 +347,12 @@ public class ConfigurationManager { private static void maybeFillLinks(TaskConfig task) { if (task.getTaskLinksSize() == 0) { ImmutableMap.Builder links = ImmutableMap.builder(); - if (task.getRequestedPorts().contains("health")) { - links.put("health", "http://%host%:%port:health%"); - } - if (task.getRequestedPorts().contains("http")) { - links.put("http", "http://%host%:%port:http%"); + for (IResource resource : ResourceManager.getTaskResources(ITaskConfig.build(task), PORTS)) { + if (resource.getNamedPort().equals("health")) { + links.put("health", "http://%host%:%port:health%"); + } else if (resource.getNamedPort().equals("http")) { + links.put("http", "http://%host%:%port:http%"); + } } task.setTaskLinks(links.build()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java index 625e6d5..1ee2cfa 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java @@ -303,10 +303,6 @@ public interface SchedulingFilter { return jobState; } - public Set getRequestedPorts() { - return task.getRequestedPorts(); - } - @Override public boolean equals(Object o) { if (!(o instanceof ResourceRequest)) { http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java new file mode 100644 index 0000000..8b42bf0 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java @@ -0,0 +1,81 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.resources; + +import java.util.EnumSet; +import java.util.Set; +import java.util.stream.Collectors; + +import com.google.common.collect.Iterables; + +import org.apache.aurora.scheduler.storage.entities.IResource; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.mesos.Protos.Resource; + +import static org.apache.mesos.Protos.Offer; + +/** + * Manages resources and provides Aurora/Mesos translation. + */ +public final class ResourceManager { + private ResourceManager() { + // Utility class. + } + + /** + * Gets offer resources matching specified {@link ResourceType}. + * + * @param offer Offer to get resources from. + * @param type {@link ResourceType} to filter resources by. + * @return Offer resources matching {@link ResourceType}. + */ + public static Iterable getOfferResources(Offer offer, ResourceType type) { + return Iterables.filter(offer.getResourcesList(), r -> r.getName().equals(type.getMesosName())); + } + + /** + * Same as {@link #getTaskResources(ITaskConfig, ResourceType)}. + * + * @param task Scheduled task to get resources from. + * @param type {@link ResourceType} to filter resources by. + * @return Task resources matching {@link ResourceType}. + */ + public static Iterable getTaskResources(IScheduledTask task, ResourceType type) { + return getTaskResources(task.getAssignedTask().getTask(), type); + } + + /** + * Gets task resources matching specified {@link ResourceType}. + * + * @param task Task config to get resources from. + * @param type {@link ResourceType} to filter resources by. + * @return Task resources matching {@link ResourceType}. + */ + public static Iterable getTaskResources(ITaskConfig task, ResourceType type) { + return Iterables.filter(task.getResources(), r -> ResourceType.fromResource(r).equals(type)); + } + + /** + * Gets unique task resource types. + * + * @param task Task to get resource types from. + * @return Set of {@link ResourceType} instances representing task resources. + */ + public static Set getTaskResourceTypes(IScheduledTask task) { + return EnumSet.copyOf(task.getAssignedTask().getTask().getResources().stream() + .map(r -> ResourceType.fromResource(r)) + .collect(Collectors.toSet())); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java new file mode 100644 index 0000000..c06ce8d --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceMapper.java @@ -0,0 +1,84 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.resources; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.google.common.collect.ContiguousSet; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; + +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.mesos.Protos.Offer; + +import static java.util.stream.StreamSupport.stream; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.DiscreteDomain.integers; + +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; + +/** + * Maps requested (task) resources to available (offer) resources. + */ +public interface ResourceMapper { + + /** + * Maps task resources to offer resources and returns a new task with updated mapping. + * + * @param offer Offer with available resources. + * @param task Task with requested resources. + * @return A new task with updated mapping. + */ + IScheduledTask mapAndAssign(Offer offer, IScheduledTask task); + + PortMapper PORT_MAPPER = new PortMapper(); + + class PortMapper implements ResourceMapper { + @Override + public IScheduledTask mapAndAssign(Offer offer, IScheduledTask task) { + List availablePorts = + stream(ResourceManager.getOfferResources(offer, PORTS).spliterator(), false) + .flatMap(resource -> resource.getRanges().getRangeList().stream()) + .flatMap(range -> ContiguousSet.create( + Range.closed((int) range.getBegin(), (int) range.getEnd()), + integers()).stream()) + .collect(Collectors.toList()); + + Collections.shuffle(availablePorts); + + List requestedPorts = + stream(ResourceManager.getTaskResources(task, PORTS).spliterator(), false) + .map(e -> e.getNamedPort()) + .collect(Collectors.toList()); + + checkState( + availablePorts.size() >= requestedPorts.size(), + String.format("Insufficient ports %d when matching %s", availablePorts.size(), task)); + + Iterator ports = availablePorts.iterator(); + Map portMap = + requestedPorts.stream().collect(Collectors.toMap(key -> key, value -> ports.next())); + + ScheduledTask builder = task.newBuilder(); + builder.getAssignedTask().setAssignedPorts(ImmutableMap.copyOf(portMap)); + return IScheduledTask.build(builder); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java index 1df2c11..a8dee95 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java @@ -43,6 +43,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.aurora.common.quantity.Data.BYTES; import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; /** @@ -94,7 +95,7 @@ public final class ResourceSlot { task.getNumCpus(), Amount.of(task.getRamMb(), Data.MB), Amount.of(task.getDiskMb(), Data.MB), - task.getRequestedPorts().size()); + Iterables.size(ResourceManager.getTaskResources(task, PORTS))); } /** http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java index 6e4d694..ee2b51a 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.resources; import java.util.EnumSet; +import java.util.Optional; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; @@ -29,6 +30,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.aurora.common.quantity.Data.GB; import static org.apache.aurora.common.quantity.Data.MB; +import static org.apache.aurora.scheduler.resources.ResourceMapper.PORT_MAPPER; import static org.apache.aurora.scheduler.resources.ResourceTypeConverter.DOUBLE; import static org.apache.aurora.scheduler.resources.ResourceTypeConverter.LONG; import static org.apache.aurora.scheduler.resources.ResourceTypeConverter.STRING; @@ -43,22 +45,38 @@ public enum ResourceType implements TEnum { /** * CPU resource. */ - CPUS(_Fields.NUM_CPUS, SCALAR, "cpus", DOUBLE, "CPU", 16, false), + CPUS(_Fields.NUM_CPUS, SCALAR, "cpus", DOUBLE, Optional.empty(), "CPU", 16, false), /** * RAM resource. */ - RAM_MB(_Fields.RAM_MB, SCALAR, "mem", LONG, "RAM", Amount.of(24, GB).as(MB), false), + RAM_MB( + _Fields.RAM_MB, + SCALAR, + "mem", + LONG, + Optional.empty(), + "RAM", + Amount.of(24, GB).as(MB), + false), /** * DISK resource. */ - DISK_MB(_Fields.DISK_MB, SCALAR, "disk", LONG, "disk", Amount.of(450, GB).as(MB), false), + DISK_MB( + _Fields.DISK_MB, + SCALAR, + "disk", + LONG, + Optional.empty(), + "disk", + Amount.of(450, GB).as(MB), + false), /** * Port resource. */ - PORTS(_Fields.NAMED_PORT, RANGES, "ports", STRING, "ports", 1000, true); + PORTS(_Fields.NAMED_PORT, RANGES, "ports", STRING, Optional.of(PORT_MAPPER), "ports", 1000, true); /** * Correspondent thrift {@link org.apache.aurora.gen.Resource} enum value. @@ -81,6 +99,11 @@ public enum ResourceType implements TEnum { private final ResourceTypeConverter typeConverter; /** + * Optional resource mapper to use. + */ + private final Optional mapper; + + /** * Aurora resource name. */ private final String auroraName; @@ -105,6 +128,7 @@ public enum ResourceType implements TEnum { * @param mesosType See {@link #getMesosType()} for more details. * @param mesosName See {@link #getMesosName()} for more details. * @param typeConverter See {@link #getTypeConverter()} for more details. + * @param mapper See {@link #getMapper()} for more details. * @param auroraName See {@link #getAuroraName()} for more details. * @param scalingRange See {@link #getScalingRange()} for more details. * @param isMultipleAllowed See {@link #isMultipleAllowed()} for more details. @@ -114,6 +138,7 @@ public enum ResourceType implements TEnum { Protos.Value.Type mesosType, String mesosName, ResourceTypeConverter typeConverter, + Optional mapper, String auroraName, int scalingRange, boolean isMultipleAllowed) { @@ -123,6 +148,7 @@ public enum ResourceType implements TEnum { this.mesosName = requireNonNull(mesosName); this.typeConverter = requireNonNull(typeConverter); this.auroraName = requireNonNull(auroraName); + this.mapper = requireNonNull(mapper); this.scalingRange = scalingRange; this.isMultipleAllowed = isMultipleAllowed; } @@ -171,6 +197,15 @@ public enum ResourceType implements TEnum { } /** + * Gets optional resource mapper. See {@link ResourceMapper} for more details. + * + * @return Optional ResourceMapper. + */ + public Optional getMapper() { + return mapper; + } + + /** * Gets resource name for internal Aurora representation (e.g. in the UI). * * @return Aurora resource name. http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/resources/Resources.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/Resources.java b/src/main/java/org/apache/aurora/scheduler/resources/Resources.java index 894c6a6..36d1de8 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/Resources.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/Resources.java @@ -13,8 +13,6 @@ */ package org.apache.aurora.scheduler.resources; -import java.util.Collections; -import java.util.List; import java.util.Set; import com.google.common.base.Function; @@ -23,10 +21,7 @@ import com.google.common.base.Predicates; import com.google.common.collect.ContiguousSet; import com.google.common.collect.DiscreteDomain; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Data; @@ -119,32 +114,6 @@ public final class Resources { getNumAvailablePorts()); } - /** - * Attempts to grab {@code numPorts} from this resource instance. - * - * @param numPorts The number of ports to grab. - * @return The set of ports grabbed. - * @throws InsufficientResourcesException if not enough ports were available. - */ - public Set getPorts(int numPorts) - throws InsufficientResourcesException { - - if (numPorts == 0) { - return ImmutableSet.of(); - } - - List availablePorts = Lists.newArrayList(Sets.newHashSet(Iterables.concat( - Iterables.transform(getPortRanges(), RANGE_TO_MEMBERS)))); - - if (availablePorts.size() < numPorts) { - throw new InsufficientResourcesException( - String.format("Could not get %d ports from %s", numPorts, availablePorts)); - } - - Collections.shuffle(availablePorts); - return ImmutableSet.copyOf(availablePorts.subList(0, numPorts)); - } - private int getNumAvailablePorts() { int offeredPorts = 0; for (Range range : getPortRanges()) { http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/state/StateManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java index 5d34fe3..66bfd72 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java @@ -13,13 +13,14 @@ */ package org.apache.aurora.scheduler.state; -import java.util.Map; import java.util.Set; +import java.util.function.Function; import com.google.common.base.Optional; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.mesos.Protos.SlaveID; @@ -62,7 +63,7 @@ public interface StateManager { * @param taskId ID of the task to mutate. * @param slaveHost Host name that the task is being assigned to. * @param slaveId ID of the slave that the task is being assigned to. - * @param assignedPorts Ports on the host that are being assigned to the task. + * @param resourceAssigner The resource assign operation. * @return The updated task record, or {@code null} if the task was not found. */ IAssignedTask assignTask( @@ -70,7 +71,7 @@ public interface StateManager { String taskId, String slaveHost, SlaveID slaveId, - Map assignedPorts); + Function resourceAssigner); /** * Inserts pending instances using {@code task} as their configuration. Tasks will immediately http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java index e5b2f41..2b4fac1 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java @@ -18,11 +18,11 @@ import java.net.UnknownHostException; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; @@ -165,18 +165,18 @@ public class StateManagerImpl implements StateManager { String taskId, String slaveHost, SlaveID slaveId, - Map assignedPorts) { + Function resourceAssigner) { checkNotBlank(taskId); checkNotBlank(slaveHost); requireNonNull(slaveId); - requireNonNull(assignedPorts); + requireNonNull(resourceAssigner); IScheduledTask mutated = storeProvider.getUnsafeTaskStore().mutateTask(taskId, task -> { + task = resourceAssigner.apply(task); ScheduledTask builder = task.newBuilder(); builder.getAssignedTask() - .setAssignedPorts(assignedPorts) .setSlaveHost(slaveHost) .setSlaveId(slaveId.getValue()); return IScheduledTask.build(builder); @@ -231,9 +231,6 @@ public class StateManagerImpl implements StateManager { transitionMessage); } - private static final Function GET_ACTION = - SideEffect::getAction; - private static final List ACTIONS_IN_ORDER = ImmutableList.of( Action.INCREMENT_FAILURES, Action.SAVE_STATE, @@ -252,7 +249,7 @@ public class StateManagerImpl implements StateManager { // (thus losing the object to copy), or rescheduling a task before incrementing the failure count // (thus not carrying forward the failure increment). private static final Ordering ACTION_ORDER = - Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(GET_ACTION); + Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(SideEffect::getAction); private StateChangeResult updateTaskAndExternalState( TaskStore.Mutable taskStore, http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java index 7d43d4a..1a3886f 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java @@ -13,7 +13,6 @@ */ package org.apache.aurora.scheduler.state; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -22,8 +21,6 @@ import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.FluentIterable; import org.apache.aurora.common.inject.TimedInterceptor.Timed; import org.apache.aurora.common.stats.Stats; @@ -38,8 +35,11 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup; import org.apache.aurora.scheduler.mesos.MesosTaskFactory; import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.resources.ResourceManager; +import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.resources.Resources; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.mesos.Protos.TaskInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,26 +103,29 @@ public interface TaskAssigner { this.tierManager = requireNonNull(tierManager); } + @VisibleForTesting + IScheduledTask mapAndAssignResources(Offer offer, IScheduledTask task) { + IScheduledTask assigned = task; + for (ResourceType type : ResourceManager.getTaskResourceTypes(assigned)) { + if (type.getMapper().isPresent()) { + assigned = type.getMapper().get().mapAndAssign(offer, assigned); + } + } + return assigned; + } + private TaskInfo assign( MutableStoreProvider storeProvider, Offer offer, - Set requestedPorts, String taskId) { String host = offer.getHostname(); - Set selectedPorts = Resources.from(offer).getPorts(requestedPorts.size()); - Preconditions.checkState(selectedPorts.size() == requestedPorts.size()); - - final Iterator names = requestedPorts.iterator(); - Map portsByName = FluentIterable.from(selectedPorts) - .uniqueIndex(input -> names.next()); - IAssignedTask assigned = stateManager.assignTask( storeProvider, taskId, host, offer.getSlaveId(), - portsByName); + task -> mapAndAssignResources(offer, task)); LOG.info( "Offer on slave {} (id {}) is being assigned task for {}.", host, offer.getSlaveId().getValue(), taskId); @@ -158,7 +161,6 @@ public interface TaskAssigner { TaskInfo taskInfo = assign( storeProvider, offer.getOffer(), - resourceRequest.getRequestedPorts(), taskId); try { http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java index 7313279..98fe860 100644 --- a/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/configuration/ConfigurationManagerTest.java @@ -298,6 +298,17 @@ public class ConfigurationManagerTest { DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder)); } + @Test + public void testTaskLinks() throws Exception { + TaskConfig builder = CONFIG_WITH_CONTAINER.newBuilder(); + builder.addToResources(namedPort("health")); + builder.unsetTaskLinks(); + + ITaskConfig populated = + DOCKER_CONFIGURATION_MANAGER.validateAndPopulate(ITaskConfig.build(builder)); + assertEquals(ImmutableSet.of("health", "http"), populated.getTaskLinks().keySet()); + } + private void expectTaskDescriptionException(String message) { expectedException.expect(TaskDescriptionException.class); expectedException.expectMessage(message); http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java index 6370a12..94a885f 100644 --- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java @@ -52,6 +52,7 @@ import org.junit.Test; import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE; import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.resetPorts; import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; @@ -103,18 +104,18 @@ public class SchedulingFilterImplTest extends EasyMockTest { ResourceSlot twoPorts = Resources.from( Offers.createOffer(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK, Pair.of(80, 81))).slot(); - ITaskConfig noPortTask = ITaskConfig.build(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK) - .newBuilder() - .setRequestedPorts(ImmutableSet.of())); - ITaskConfig onePortTask = ITaskConfig.build(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK) - .newBuilder() - .setRequestedPorts(ImmutableSet.of("one"))); - ITaskConfig twoPortTask = ITaskConfig.build(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK) - .newBuilder() - .setRequestedPorts(ImmutableSet.of("one", "two"))); - ITaskConfig threePortTask = ITaskConfig.build(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK) - .newBuilder() - .setRequestedPorts(ImmutableSet.of("one", "two", "three"))); + ITaskConfig noPortTask = resetPorts( + makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK), + ImmutableSet.of()); + ITaskConfig onePortTask = resetPorts( + makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK), + ImmutableSet.of("one")); + ITaskConfig twoPortTask = resetPorts( + makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK), + ImmutableSet.of("one", "two")); + ITaskConfig threePortTask = resetPorts( + makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK), + ImmutableSet.of("one", "two", "three")); Set none = ImmutableSet.of(); IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A)); http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java index bf18d5d..ad397c6 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java @@ -66,6 +66,7 @@ import static org.apache.aurora.scheduler.mesos.TaskExecutors.SOME_OVERHEAD_EXEC import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_CONFIG; import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_EXECUTOR; import static org.apache.aurora.scheduler.resources.ResourceSlot.makeMesosRangeResource; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.resetPorts; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -187,8 +188,9 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { @Test public void testCreateFromPortsUnset() { AssignedTask builder = TASK.newBuilder(); - builder.getTask().unsetRequestedPorts(); builder.unsetAssignedPorts(); + builder.setTask( + resetPorts(ITaskConfig.build(builder.getTask()), ImmutableSet.of()).newBuilder()); IAssignedTask assignedTask = IAssignedTask.build(builder); expect(tierManager.getTier(assignedTask.getTask())).andReturn(DEV_TIER); taskFactory = new MesosTaskFactoryImpl(config, tierManager, SERVER_INFO); @@ -353,8 +355,9 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { public void testPopulateDiscoveryInfoNoPort() { config = new ExecutorSettings(THERMOS_CONFIG, true); AssignedTask builder = TASK.newBuilder(); - builder.getTask().unsetRequestedPorts(); builder.unsetAssignedPorts(); + builder.setTask( + resetPorts(ITaskConfig.build(builder.getTask()), ImmutableSet.of()).newBuilder()); IAssignedTask assignedTask = IAssignedTask.build(builder); expect(tierManager.getTier(assignedTask.getTask())).andReturn(DEV_TIER); taskFactory = new MesosTaskFactoryImpl(config, tierManager, SERVER_INFO); http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java b/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java index d5f2172..36c5c11 100644 --- a/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java +++ b/src/test/java/org/apache/aurora/scheduler/resources/AcceptedOfferTest.java @@ -29,6 +29,9 @@ import org.apache.mesos.Protos; import org.apache.mesos.Protos.Resource; import org.junit.Test; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer; import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; @@ -51,10 +54,10 @@ public class AcceptedOfferTest { @Test public void testReservedPredicates() { - Protos.Resource withRole = makeScalar(CPUS.getMesosName(), TEST_ROLE, false, 1.0); + Protos.Resource withRole = mesosScalar(CPUS, TEST_ROLE, false, 1.0); assertTrue(AcceptedOffer.RESERVED.apply(withRole)); assertFalse(AcceptedOffer.NOT_RESERVED.apply(withRole)); - Protos.Resource absentRole = makeScalar(CPUS.getMesosName(), ABSENT_ROLE, false, 1.0); + Protos.Resource absentRole = mesosScalar(CPUS, ABSENT_ROLE, false, 1.0); assertFalse(AcceptedOffer.RESERVED.apply(absentRole)); assertTrue(AcceptedOffer.NOT_RESERVED.apply(absentRole)); } @@ -62,7 +65,7 @@ public class AcceptedOfferTest { @Test public void testAllocateEmpty() { AcceptedOffer acceptedOffer = AcceptedOffer.create( - fakeOffer(Collections.emptyList()), + offer(), ResourceSlot.NONE, ResourceSlot.NONE, ImmutableSet.of(), @@ -73,12 +76,10 @@ public class AcceptedOfferTest { @Test public void testAllocateRange() { - List resources = ImmutableList.builder() - .add(makePortResource(Optional.absent(), 80, 81, 90, 91, 92, 93)) - .add(makePortResource(TEST_ROLE, 100, 101)) - .build(); AcceptedOffer acceptedOffer = AcceptedOffer.create( - fakeOffer(resources), + offer( + mesosRange(PORTS, Optional.absent(), 80, 81, 90, 91, 92, 93), + mesosRange(PORTS, TEST_ROLE, 100, 101)), ResourceSlot.NONE, ResourceSlot.NONE, ImmutableSet.of(80, 90, 100), @@ -87,8 +88,8 @@ public class AcceptedOfferTest { List expected = ImmutableList.builder() // Because we prefer reserved resources and handle them before non-reserved resources, // result should have ports for the reserved resources first. - .add(makePortResource(TEST_ROLE, 100)) - .add(makePortResource(Optional.absent(), 80, 90)) + .add(mesosRange(PORTS, TEST_ROLE, 100)) + .add(mesosRange(PORTS, Optional.absent(), 80, 90)) .build(); assertEquals(expected, acceptedOffer.getTaskResources()); assertEquals(Collections.emptyList(), acceptedOffer.getExecutorResources()); @@ -96,11 +97,10 @@ public class AcceptedOfferTest { @Test(expected = Resources.InsufficientResourcesException.class) public void testAllocateRangeInsufficent() { - List resources = ImmutableList.of( - makePortResource(ABSENT_ROLE, 80), - makePortResource(ABSENT_ROLE, 100, 101)); AcceptedOffer.create( - fakeOffer(resources), + offer( + mesosRange(PORTS, ABSENT_ROLE, 80), + mesosRange(PORTS, ABSENT_ROLE, 100, 101)), ResourceSlot.NONE, ResourceSlot.NONE, ImmutableSet.of(80, 90, 100), @@ -116,52 +116,43 @@ public class AcceptedOfferTest { } private void runAllocateSingleRole(Optional role, boolean cpuRevocable) { - List resources = ImmutableList.builder() - .add(makeScalar( - CPUS.getMesosName(), role, cpuRevocable, TOTAL_SLOT.getNumCpus())) - .add(makeScalar( - RAM_MB.getMesosName(), role, false, TOTAL_SLOT.getRam().as(Data.MB))) - .add(makeScalar( - DISK_MB.getMesosName(), role, false, TOTAL_SLOT.getDisk().as(Data.MB))) - .add(makePortResource(role, TASK_PORTS)) - .build(); - Protos.Offer offer = fakeOffer(resources); + Protos.Offer offer = offer( + mesosScalar(CPUS, role, cpuRevocable, TOTAL_SLOT.getNumCpus()), + mesosScalar(RAM_MB, role, false, TOTAL_SLOT.getRam().as(Data.MB)), + mesosScalar(DISK_MB, role, false, TOTAL_SLOT.getDisk().as(Data.MB)), + mesosRange(PORTS, role, TASK_PORTS)); AcceptedOffer offerAllocation = AcceptedOffer.create( offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, cpuRevocable)); List taskList = ImmutableList.builder() - .add(makeScalar(CPUS.getMesosName(), role, cpuRevocable, TASK_SLOT.getNumCpus())) - .add(makeScalar(RAM_MB.getMesosName(), role, false, TASK_SLOT.getRam().as(Data.MB))) - .add(makeScalar( - DISK_MB.getMesosName(), role, false, TASK_SLOT.getDisk().as(Data.MB))) - .add(makePortResource(role, TASK_PORTS)) + .add(mesosScalar(CPUS, role, cpuRevocable, TASK_SLOT.getNumCpus())) + .add(mesosScalar(RAM_MB, role, false, TASK_SLOT.getRam().as(Data.MB))) + .add(mesosScalar( + DISK_MB, role, false, TASK_SLOT.getDisk().as(Data.MB))) + .add(mesosRange(PORTS, role, TASK_PORTS)) .build(); assertEquals(taskList, offerAllocation.getTaskResources()); List executorList = ImmutableList.builder() - .add(makeScalar( - CPUS.getMesosName(), role, cpuRevocable, EXECUTOR_SLOT.getNumCpus())) - .add(makeScalar( - RAM_MB.getMesosName(), role, false, EXECUTOR_SLOT.getRam().as(Data.MB))) - .add(makeScalar( - DISK_MB.getMesosName(), role, false, EXECUTOR_SLOT.getDisk().as(Data.MB))) + .add(mesosScalar( + CPUS, role, cpuRevocable, EXECUTOR_SLOT.getNumCpus())) + .add(mesosScalar( + RAM_MB, role, false, EXECUTOR_SLOT.getRam().as(Data.MB))) + .add(mesosScalar( + DISK_MB, role, false, EXECUTOR_SLOT.getDisk().as(Data.MB))) .build(); assertEquals(executorList, offerAllocation.getExecutorResources()); } @Test(expected = Resources.InsufficientResourcesException.class) public void testAllocateSingleRoleInsufficient() { - List resources = ImmutableList.builder() + Protos.Offer offer = offer( // EXECUTOR_SLOT's CPU is not included here. - .add(makeScalar(CPUS.getMesosName(), TEST_ROLE, false, TASK_SLOT.getNumCpus())) - .add(makeScalar( - RAM_MB.getMesosName(), TEST_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB))) - .add(makeScalar( - DISK_MB.getMesosName(), TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB))) - .add(makePortResource(TEST_ROLE, TASK_PORTS)) - .build(); - Protos.Offer offer = fakeOffer(resources); + mesosScalar(CPUS, TEST_ROLE, false, TASK_SLOT.getNumCpus()), + mesosScalar(RAM_MB, TEST_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)), + mesosScalar(DISK_MB, TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)), + mesosRange(PORTS, TEST_ROLE, TASK_PORTS)); AcceptedOffer.create( offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, false)); @@ -174,136 +165,64 @@ public class AcceptedOfferTest { } private void runMultipleRoles(boolean cpuRevocable) { - List resources = ImmutableList.builder() + Protos.Offer offer = offer( // Make cpus come from two roles. - .add(makeScalar( - CPUS.getMesosName(), - TEST_ROLE, - cpuRevocable, - EXECUTOR_SLOT.getNumCpus())) - .add(makeScalar( - CPUS.getMesosName(), - ABSENT_ROLE, - cpuRevocable, - TASK_SLOT.getNumCpus())) + mesosScalar(CPUS, TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus()), + mesosScalar(CPUS, ABSENT_ROLE, cpuRevocable, TASK_SLOT.getNumCpus()), // Make ram come from default role - .add(makeScalar( - RAM_MB.getMesosName(), - ABSENT_ROLE, - false, - TOTAL_SLOT.getRam().as(Data.MB))) + mesosScalar(RAM_MB, ABSENT_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)), // Make disk come from non-default role. - .add(makeScalar( - DISK_MB.getMesosName(), - TEST_ROLE, - false, - TOTAL_SLOT.getDisk().as(Data.MB))) - .add(makePortResource(TEST_ROLE, 80)) - .add(makePortResource(ABSENT_ROLE, 90)) - .build(); - - Protos.Offer offer = fakeOffer(resources); + mesosScalar(DISK_MB, TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)), + mesosRange(PORTS, TEST_ROLE, 80), + mesosRange(PORTS, ABSENT_ROLE, 90)); AcceptedOffer offerAllocation = AcceptedOffer.create( offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, cpuRevocable)); List taskList = ImmutableList.builder() // We intentionally sliced the offer resource to not align with TASK_SLOT's num cpus. - .add(makeScalar( - CPUS.getMesosName(), TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus())) - .add(makeScalar( - CPUS.getMesosName(), + .add(mesosScalar( + CPUS, TEST_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus())) + .add(mesosScalar( + CPUS, ABSENT_ROLE, cpuRevocable, TASK_SLOT.subtract(EXECUTOR_SLOT).getNumCpus())) - .add(makeScalar( - RAM_MB.getMesosName(), ABSENT_ROLE, false, TASK_SLOT.getRam().as(Data.MB))) - .add(makeScalar( - DISK_MB.getMesosName(), TEST_ROLE, false, TASK_SLOT.getDisk().as(Data.MB))) - .add(makePortResource(TEST_ROLE, 80)) - .add(makePortResource(ABSENT_ROLE, 90)) + .add(mesosScalar( + RAM_MB, ABSENT_ROLE, false, TASK_SLOT.getRam().as(Data.MB))) + .add(mesosScalar( + DISK_MB, TEST_ROLE, false, TASK_SLOT.getDisk().as(Data.MB))) + .add(mesosRange(PORTS, TEST_ROLE, 80)) + .add(mesosRange(PORTS, ABSENT_ROLE, 90)) .build(); assertEquals(taskList, offerAllocation.getTaskResources()); List executorList = ImmutableList.builder() - .add(makeScalar( - CPUS.getMesosName(), ABSENT_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus())) - .add(makeScalar( - RAM_MB.getMesosName(), ABSENT_ROLE, false, EXECUTOR_SLOT.getRam().as(Data.MB))) - .add(makeScalar( - DISK_MB.getMesosName(), TEST_ROLE, false, EXECUTOR_SLOT.getDisk().as(Data.MB))) + .add(mesosScalar( + CPUS, ABSENT_ROLE, cpuRevocable, EXECUTOR_SLOT.getNumCpus())) + .add(mesosScalar( + RAM_MB, ABSENT_ROLE, false, EXECUTOR_SLOT.getRam().as(Data.MB))) + .add(mesosScalar( + DISK_MB, TEST_ROLE, false, EXECUTOR_SLOT.getDisk().as(Data.MB))) .build(); assertEquals(executorList, offerAllocation.getExecutorResources()); } @Test(expected = Resources.InsufficientResourcesException.class) public void testMultipleRolesInsufficient() { - // Similar to testMultipleRoles, but make some of cpus as revocable - List resources = ImmutableList.builder() + Protos.Offer offer = offer( + // Similar to testMultipleRoles, but make some of cpus as revocable // Make cpus come from two roles. - .add(makeScalar( - CPUS.getMesosName(), - TEST_ROLE, - true, - EXECUTOR_SLOT.getNumCpus())) - .add(makeScalar( - CPUS.getMesosName(), - ABSENT_ROLE, - false, - TASK_SLOT.getNumCpus())) + mesosScalar(CPUS, TEST_ROLE, true, EXECUTOR_SLOT.getNumCpus()), + mesosScalar(CPUS, ABSENT_ROLE, false, TASK_SLOT.getNumCpus()), // Make ram come from default role - .add(makeScalar( - RAM_MB.getMesosName(), - ABSENT_ROLE, - false, - TOTAL_SLOT.getRam().as(Data.MB))) + mesosScalar(RAM_MB, ABSENT_ROLE, false, TOTAL_SLOT.getRam().as(Data.MB)), // Make disk come from non-default role. - .add(makeScalar( - DISK_MB.getMesosName(), - TEST_ROLE, - false, - TOTAL_SLOT.getDisk().as(Data.MB))) - .add(makePortResource(TEST_ROLE, 80)) - .add(makePortResource(ABSENT_ROLE, 90)) - .build(); - Protos.Offer offer = fakeOffer(resources); + mesosScalar(DISK_MB, TEST_ROLE, false, TOTAL_SLOT.getDisk().as(Data.MB)), + mesosRange(PORTS, TEST_ROLE, 80), + mesosRange(PORTS, ABSENT_ROLE, 90)); // We don't have enough resource to satisfy a non-revocable request. AcceptedOffer.create( offer, TASK_SLOT, EXECUTOR_SLOT, TASK_PORTS_SET, new TierInfo(false, false)); } - - private static Resource makePortResource(Optional role, Integer... values) { - Resource.Builder prototype = Resource.newBuilder() - .setType(Protos.Value.Type.RANGES) - .setName(PORTS.getMesosName()); - if (role.isPresent()) { - prototype.setRole(role.get()); - } - return AcceptedOffer.makeMesosRangeResource(prototype.build(), ImmutableSet.copyOf(values)); - } - - private static Resource makeScalar( - String name, Optional role, boolean revocable, double value) { - Resource.Builder resource = Resource.newBuilder() - .setName(name) - .setType(Protos.Value.Type.SCALAR) - .setScalar(Protos.Value.Scalar.newBuilder().setValue(value)); - if (role.isPresent()) { - resource.setRole(role.get()); - } - if (revocable) { - resource.setRevocable(Resource.RevocableInfo.getDefaultInstance()); - } - return resource.build(); - } - - private static Protos.Offer fakeOffer(List resources) { - return Protos.Offer.newBuilder() - .setId(Protos.OfferID.newBuilder().setValue("offer-id")) - .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id")) - .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id")) - .setHostname("hostname") - .addAllResources(resources) - .build(); - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/PortMapperTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/resources/PortMapperTest.java b/src/test/java/org/apache/aurora/scheduler/resources/PortMapperTest.java new file mode 100644 index 0000000..c94f7a9 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/resources/PortMapperTest.java @@ -0,0 +1,62 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.resources; + +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.mesos.Protos; +import org.junit.Test; + +import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; +import static org.apache.aurora.scheduler.resources.ResourceMapper.PORT_MAPPER; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer; +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; +import static org.junit.Assert.assertEquals; + +public class PortMapperTest { + @Test + public void testAssignNoPorts() { + ScheduledTask builder = makeTask("id", JOB).newBuilder(); + builder.getAssignedTask().getTask().unsetResources(); + builder.getAssignedTask().unsetAssignedPorts(); + IScheduledTask task = IScheduledTask.build(builder); + + assertEquals(task, PORT_MAPPER.mapAndAssign(offer(), task)); + } + + @Test(expected = IllegalStateException.class) + public void testPortRangeScarcity() { + PORT_MAPPER.mapAndAssign(offer(), makeTask("id", JOB)); + } + + @Test + public void testPortRangeAbundance() { + Protos.Offer offer = offer(mesosRange(PORTS, 1, 2, 3, 4, 5)); + assertEquals( + 1, + PORT_MAPPER.mapAndAssign(offer, makeTask("id", JOB)) + .getAssignedTask().getAssignedPorts().size()); + } + + @Test + public void testPortRangeExact() { + Protos.Offer offer = offer(mesosRange(PORTS, 1)); + assertEquals( + 1, + PORT_MAPPER.mapAndAssign(offer, makeTask("id", JOB)) + .getAssignedTask().getAssignedPorts().size()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java new file mode 100644 index 0000000..b6810b1 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java @@ -0,0 +1,88 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.resources; + +import java.util.EnumSet; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.scheduler.storage.entities.IResource; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.Offer; +import org.apache.mesos.Protos.Value.Scalar; +import org.junit.Test; + +import static org.apache.aurora.gen.Resource.namedPort; +import static org.apache.aurora.gen.Resource.numCpus; +import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; +import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; +import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; +import static org.apache.mesos.Protos.Value.Type.SCALAR; +import static org.junit.Assert.assertEquals; + +public class ResourceManagerTest { + @Test + public void testGetOfferResources() { + Protos.Resource resource1 = Protos.Resource.newBuilder() + .setType(SCALAR) + .setName(CPUS.getMesosName()) + .setScalar(Scalar.newBuilder().setValue(2.0).build()) + .build(); + + Protos.Resource resource2 = Protos.Resource.newBuilder() + .setType(SCALAR) + .setName(RAM_MB.getMesosName()) + .setScalar(Scalar.newBuilder().setValue(64).build()) + .build(); + + Offer offer = Offer.newBuilder() + .setId(Protos.OfferID.newBuilder().setValue("offer-id")) + .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id")) + .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id")) + .setHostname("hostname") + .addAllResources(ImmutableSet.of(resource1, resource2)).build(); + + assertEquals( + resource1, + Iterables.getOnlyElement(ResourceManager.getOfferResources(offer, CPUS))); + assertEquals( + resource2, + Iterables.getOnlyElement(ResourceManager.getOfferResources(offer, RAM_MB))); + } + + @Test + public void testGetTaskResources() { + assertEquals( + IResource.build(numCpus(1.0)), + Iterables.getOnlyElement(ResourceManager.getTaskResources(makeTask("id", JOB), CPUS))); + assertEquals( + IResource.build(namedPort("http")), + Iterables.getOnlyElement(ResourceManager.getTaskResources(makeTask("id", JOB), PORTS))); + } + + @Test + public void testGetTaskResourceTypes() { + ScheduledTask builder = makeTask("id", JOB).newBuilder(); + builder.getAssignedTask().getTask().addToResources(namedPort("health")); + + assertEquals( + EnumSet.allOf(ResourceType.class), + ResourceManager.getTaskResourceTypes(IScheduledTask.build(builder))); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java index 04a8238..1583cef 100644 --- a/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java +++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java @@ -13,14 +13,26 @@ */ package org.apache.aurora.scheduler.resources; +import java.util.Set; + +import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import org.apache.aurora.gen.Resource; import org.apache.aurora.gen.ResourceAggregate; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.base.Numbers; +import org.apache.aurora.scheduler.storage.entities.IResource; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.mesos.Protos; import static org.apache.aurora.gen.Resource.diskMb; import static org.apache.aurora.gen.Resource.numCpus; import static org.apache.aurora.gen.Resource.ramMb; +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; +import static org.apache.aurora.scheduler.resources.ResourceType.fromResource; /** * Convenience methods for working with resources. @@ -45,4 +57,77 @@ public final class ResourceTestUtil { .setRamMb(ramMb) .setDiskMb(diskMb)); } + + public static ITaskConfig resetPorts(ITaskConfig config, Set portNames) { + TaskConfig builder = config.newBuilder() + .setRequestedPorts(portNames); + builder.getResources().removeIf(e -> fromResource(IResource.build(e)).equals(PORTS)); + portNames.forEach(e -> builder.addToResources(Resource.namedPort(e))); + return ITaskConfig.build(builder); + } + + public static Protos.Resource mesosScalar(ResourceType type, double value) { + return mesosScalar(type, Optional.absent(), false, value); + } + + public static Protos.Resource mesosScalar(ResourceType type, double value, boolean revocable) { + return mesosScalar(type, Optional.absent(), revocable, value); + } + + public static Protos.Resource mesosScalar( + ResourceType type, + Optional role, + boolean revocable, + double value) { + + return resourceBuilder(type, role, revocable) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(value).build()) + .build(); + } + + public static Protos.Resource mesosRange(ResourceType type, Integer... values) { + return mesosRange(type, Optional.absent(), values); + } + + public static Protos.Resource mesosRange( + ResourceType type, + Optional role, + Integer... values) { + + return resourceBuilder(type, role, false) + .setRanges(Protos.Value.Ranges.newBuilder().addAllRange( + Iterables.transform( + Numbers.toRanges(ImmutableSet.copyOf(values)), + ResourceSlot.RANGE_TRANSFORM))) + .build(); + } + + public static Protos.Offer offer(Protos.Resource... resources) { + return Protos.Offer.newBuilder() + .setId(Protos.OfferID.newBuilder().setValue("offer-id")) + .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id")) + .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id")) + .setHostname("hostname") + .addAllResources(ImmutableSet.copyOf(resources)).build(); + } + + private static Protos.Resource.Builder resourceBuilder( + ResourceType type, + Optional role, + boolean revocable) { + + Protos.Resource.Builder builder = Protos.Resource.newBuilder() + .setType(type.getMesosType()) + .setName(type.getMesosName()); + + if (revocable) { + builder.setRevocable(Protos.Resource.RevocableInfo.getDefaultInstance()); + } + + if (role.isPresent()) { + builder.setRole(role.get()); + } + + return builder; + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java index ceb81e4..185338e 100644 --- a/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java +++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourcesTest.java @@ -13,164 +13,64 @@ */ package org.apache.aurora.scheduler.resources; -import java.util.Set; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.common.collections.Pair; import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.scheduler.resources.Resources.InsufficientResourcesException; import org.apache.mesos.Protos; -import org.apache.mesos.Protos.Resource; -import org.apache.mesos.Protos.Value.Range; -import org.apache.mesos.Protos.Value.Ranges; import org.junit.Test; import static org.apache.aurora.common.quantity.Data.MB; import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER; import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER; -import static org.apache.aurora.scheduler.resources.ResourceSlot.makeMesosResource; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer; import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; -import static org.apache.mesos.Protos.Value.Type.RANGES; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; public class ResourcesTest { @Test - public void testPortRangeExact() { - Resource portsResource = createPortRange(Pair.of(1, 5)); - Set ports = Resources.from(createOffer(portsResource)).getPorts(5); - assertEquals(5, ports.size()); - } - - @Test - public void testOnePortAvailable() { - Resource portsResource = createPortRange(Pair.of(3, 3)); - Set ports = Resources.from(createOffer(portsResource)).getPorts(1); - assertEquals(1, ports.size()); - } - - @Test - public void testPortRangeAbundance() { - Resource portsResource = createPortRange(Pair.of(1, 10)); - Set ports = Resources.from(createOffer(portsResource)).getPorts(5); - assertEquals(5, ports.size()); - } - - @Test - public void testPortRangeExhaust() { - Resource portsResource = createPortRanges(Pair.of(1, 2), Pair.of(10, 15)); - - Set ports = Resources.from(createOffer(portsResource)).getPorts(7); - assertEquals(7, ports.size()); - - ports = Resources.from(createOffer(portsResource)).getPorts(8); - assertEquals(8, ports.size()); - - try { - Resources.from(createOffer(portsResource)).getPorts(9); - fail("Ports should not have been sufficient"); - } catch (InsufficientResourcesException e) { - // Expected. - } - } - - @Test - public void testGetNoPorts() { - Resource portsResource = createPortRange(Pair.of(1, 5)); - assertEquals(ImmutableSet.of(), Resources.from(createOffer(portsResource)).getPorts(0)); - } - - @Test(expected = Resources.InsufficientResourcesException.class) - public void testPortRangeScarcity() { - Resource portsResource = createPortRange(Pair.of(1, 2)); - Resources.from(createOffer(portsResource)).getPorts(5); - } - - @Test public void testGetSlot() { - ImmutableList resources = ImmutableList.builder() - .add(makeMesosResource(CPUS, 8.0, false)) - .add(makeMesosResource(RAM_MB, 1024, false)) - .add(makeMesosResource(DISK_MB, 2048, false)) - .add(createPortRange(Pair.of(1, 10))) - .build(); - - ResourceSlot expected = new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(2048L, MB), 10); - assertEquals(expected, Resources.from(createOffer(resources)).slot()); + Protos.Offer offer = offer( + mesosScalar(CPUS, 8.0, false), + mesosScalar(RAM_MB, 1024, false), + mesosScalar(DISK_MB, 2048, false), + mesosRange(PORTS, 1, 2, 3)); + + ResourceSlot expected = new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(2048L, MB), 3); + assertEquals(expected, Resources.from(offer).slot()); } @Test public void testMissingResourcesHandledGracefully() { - ImmutableList resources = ImmutableList.builder().build(); - assertEquals(ResourceSlot.NONE, Resources.from(createOffer(resources)).slot()); + assertEquals(ResourceSlot.NONE, Resources.from(offer()).slot()); } @Test public void testFilter() { - ImmutableList resources = ImmutableList.builder() - .add(makeMesosResource(CPUS, 8.0, true)) - .add(makeMesosResource(RAM_MB, 1024, false)) - .build(); + Protos.Offer offer = offer( + mesosScalar(CPUS, 8.0, true), + mesosScalar(RAM_MB, 1024, false)); assertEquals( new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0), - Resources.from(createOffer(resources)).filter(Resources.REVOCABLE).slot()); + Resources.from(offer).filter(Resources.REVOCABLE).slot()); } @Test public void testFilterByTier() { - ImmutableList resources = ImmutableList.builder() - .add(makeMesosResource(CPUS, 8.0, true)) - .add(makeMesosResource(CPUS, 8.0, false)) - .add(makeMesosResource(RAM_MB, 1024, false)) - .build(); + Protos.Offer offer = offer( + mesosScalar(CPUS, 8.0, true), + mesosScalar(CPUS, 8.0, false), + mesosScalar(RAM_MB, 1024, false)); assertEquals( new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0), - Resources.from(createOffer(resources)).filter(REVOCABLE_TIER).slot()); + Resources.from(offer).filter(REVOCABLE_TIER).slot()); assertEquals( new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(0L, MB), 0), - Resources.from(createOffer(resources)).filter(DEV_TIER).slot()); - } - - private Resource createPortRange(Pair range) { - return createPortRanges(ImmutableSet.of(range)); - } - - private Resource createPortRanges(Pair rangeA, Pair rangeB) { - return createPortRanges( - ImmutableSet.>builder().add(rangeA).add(rangeB).build()); - } - - private Resource createPortRanges(Set> ports) { - Ranges.Builder ranges = Ranges.newBuilder(); - for (Pair range : ports) { - ranges.addRange(Range.newBuilder().setBegin(range.getFirst()).setEnd(range.getSecond())); - } - - return Resource.newBuilder() - .setName(PORTS.getMesosName()) - .setType(RANGES) - .setRanges(ranges) - .build(); - } - - private static Protos.Offer createOffer(Resource resource) { - return createOffer(ImmutableList.of(resource)); - } - - private static Protos.Offer createOffer(Iterable resources) { - return Protos.Offer.newBuilder() - .setId(Protos.OfferID.newBuilder().setValue("offer-id")) - .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id")) - .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id")) - .setHostname("hostname") - .addAllResources(resources).build(); + Resources.from(offer).filter(DEV_TIER).slot()); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java index 498da78..2370178 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java @@ -17,6 +17,8 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -44,6 +46,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; @@ -69,6 +72,8 @@ import static org.apache.aurora.gen.ScheduleStatus.LOST; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.resetPorts; +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; import static org.apache.aurora.scheduler.state.StateChangeResult.ILLEGAL; import static org.apache.aurora.scheduler.state.StateChangeResult.INVALID_CAS_STATE; import static org.apache.aurora.scheduler.state.StateChangeResult.NOOP; @@ -403,14 +408,10 @@ public class StateManagerImplTest extends EasyMockTest { storeProvider -> stateManager.deleteTasks(storeProvider, ImmutableSet.of(taskId))); } - private static ITaskConfig setRequestedPorts(ITaskConfig config, Set portNames) { - return ITaskConfig.build(config.newBuilder().setRequestedPorts(portNames)); - } - @Test public void testPortResource() throws Exception { Set requestedPorts = ImmutableSet.of("one", "two", "three"); - ITaskConfig task = setRequestedPorts(NON_SERVICE_CONFIG, requestedPorts); + ITaskConfig task = resetPorts(NON_SERVICE_CONFIG, requestedPorts); String taskId = "a"; expect(taskIdGenerator.generate(task, 0)).andReturn(taskId); @@ -425,13 +426,15 @@ public class StateManagerImplTest extends EasyMockTest { assertEquals( requestedPorts, - actual.getAssignedTask().getTask().getRequestedPorts()); + StreamSupport.stream(ResourceManager.getTaskResources(actual, PORTS).spliterator(), false) + .map(e -> e.getNamedPort()) + .collect(Collectors.toSet())); } @Test public void testPortResourceResetAfterReschedule() throws Exception { Set requestedPorts = ImmutableSet.of("one"); - ITaskConfig task = setRequestedPorts(NON_SERVICE_CONFIG, requestedPorts); + ITaskConfig task = resetPorts(NON_SERVICE_CONFIG, requestedPorts); String taskId = "a"; expect(taskIdGenerator.generate(task, 0)).andReturn(taskId); @@ -561,6 +564,10 @@ public class StateManagerImplTest extends EasyMockTest { taskId, host.getHost(), SlaveID.newBuilder().setValue(host.getSlaveId()).build(), - ports)); + e -> { + ScheduledTask builder = e.newBuilder(); + builder.getAssignedTask().setAssignedPorts(ports); + return IScheduledTask.build(builder); + })); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/d8d1c8d6/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java index a2df311..ca10323 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java @@ -20,8 +20,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.ExecutorConfig; import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.ScheduledTask; @@ -56,41 +54,31 @@ import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.LOST; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER; +import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer; +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; import static org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl.LAUNCH_FAILED_MSG; import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import static org.apache.mesos.Protos.Offer; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class TaskAssignerImplTest extends EasyMockTest { - private static final int PORT = 5000; - private static final String SLAVE_ID = "slaveId"; - private static final Offer MESOS_OFFER = Offer.newBuilder() - .setId(OfferID.newBuilder().setValue("offerId")) - .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId")) - .setSlaveId(SlaveID.newBuilder().setValue(SLAVE_ID)) - .setHostname("hostName") - .addResources(Resource.newBuilder() - .setName("ports") - .setType(Type.RANGES) - .setRanges( - Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT)))) - .build(); + private static final int PORT = 1000; + private static final Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT)); + private static final String SLAVE_ID = MESOS_OFFER.getSlaveId().getValue(); private static final HostOffer OFFER = new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes())); - private static final String PORT_NAME = "http"; - private static final IScheduledTask TASK = IScheduledTask.build( - new ScheduledTask() - .setAssignedTask(new AssignedTask() - .setTaskId("taskId") - .setTask(new TaskConfig() - .setJob(new JobKey("r", "e", "n")) - .setExecutorConfig(new ExecutorConfig().setData("opaque data")) - .setRequestedPorts(ImmutableSet.of(PORT_NAME))))); + private static final IScheduledTask TASK = makeTask("id", JOB); private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask()); private static final TaskInfo TASK_INFO = TaskInfo.newBuilder() .setName("taskName") @@ -108,7 +96,7 @@ public class TaskAssignerImplTest extends EasyMockTest { private SchedulingFilter filter; private MesosTaskFactory taskFactory; private OfferManager offerManager; - private TaskAssigner assigner; + private TaskAssignerImpl assigner; private TierManager tierManager; @Before @@ -128,13 +116,7 @@ public class TaskAssignerImplTest extends EasyMockTest { offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of()); - expect(stateManager.assignTask( - storeProvider, - Tasks.id(TASK), - MESOS_OFFER.getHostname(), - MESOS_OFFER.getSlaveId(), - ImmutableMap.of(PORT_NAME, PORT))) - .andReturn(TASK.getAssignedTask()); + expectAssignTask(MESOS_OFFER); expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER)) .andReturn(TASK_INFO); @@ -190,13 +172,7 @@ public class TaskAssignerImplTest extends EasyMockTest { expectLastCall().andThrow(new OfferManager.LaunchException("expected")); expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of()); - expect(stateManager.assignTask( - storeProvider, - Tasks.id(TASK), - MESOS_OFFER.getHostname(), - MESOS_OFFER.getSlaveId(), - ImmutableMap.of(PORT_NAME, PORT))) - .andReturn(TASK.getAssignedTask()); + expectAssignTask(MESOS_OFFER); expect(stateManager.changeState( storeProvider, Tasks.id(TASK), @@ -254,13 +230,7 @@ public class TaskAssignerImplTest extends EasyMockTest { expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer, OFFER)); expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); expect(filter.filter(UNUSED, RESOURCE_REQUEST)).andReturn(ImmutableSet.of()); - expect(stateManager.assignTask( - storeProvider, - Tasks.id(TASK), - offer.getOffer().getHostname(), - offer.getOffer().getSlaveId(), - ImmutableMap.of(PORT_NAME, PORT))) - .andReturn(TASK.getAssignedTask()); + expectAssignTask(offer.getOffer()); expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer())) .andReturn(TASK_INFO); offerManager.launchTask(offer.getOffer().getId(), TASK_INFO); @@ -306,13 +276,7 @@ public class TaskAssignerImplTest extends EasyMockTest { new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY))) .andReturn(ImmutableSet.of()); - expect(stateManager.assignTask( - storeProvider, - Tasks.id(TASK), - OFFER.getOffer().getHostname(), - OFFER.getOffer().getSlaveId(), - ImmutableMap.of(PORT_NAME, PORT))) - .andReturn(TASK.getAssignedTask()); + expectAssignTask(MESOS_OFFER); expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer())) .andReturn(TASK_INFO); offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO); @@ -326,4 +290,23 @@ public class TaskAssignerImplTest extends EasyMockTest { Tasks.id(TASK), ImmutableMap.of(SLAVE_ID, GROUP_KEY))); } + + @Test + public void testResourceMapperCallback() { + ScheduledTask builder = TASK.newBuilder(); + builder.getAssignedTask().unsetAssignedPorts(); + + control.replay(); + + assertEquals(TASK, assigner.mapAndAssignResources(MESOS_OFFER, IScheduledTask.build(builder))); + } + + private void expectAssignTask(Offer offer) { + expect(stateManager.assignTask( + eq(storeProvider), + eq(Tasks.id(TASK)), + eq(offer.getHostname()), + eq(offer.getSlaveId()), + anyObject())).andReturn(TASK.getAssignedTask()); + } }