Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8B331C6B7 for ; Thu, 13 Nov 2014 00:12:19 +0000 (UTC) Received: (qmail 14043 invoked by uid 500); 13 Nov 2014 00:12:19 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 14000 invoked by uid 500); 13 Nov 2014 00:12:19 -0000 Mailing-List: contact commits-help@aurora.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.incubator.apache.org Delivered-To: mailing list commits@aurora.incubator.apache.org Received: (qmail 13991 invoked by uid 99); 13 Nov 2014 00:12:19 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Nov 2014 00:12:19 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 13 Nov 2014 00:11:46 +0000 Received: (qmail 9929 invoked by uid 99); 13 Nov 2014 00:10:27 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Nov 2014 00:10:27 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 77677A1212B; Thu, 13 Nov 2014 00:10:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.incubator.apache.org Date: Thu, 13 Nov 2014 00:10:28 -0000 Message-Id: <3d3df7db509e43d3ad306f49f51b8dd5@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-aurora git commit: Store host attributes alongside offers to reduce number of lookups. X-Virus-Checked: Checked by ClamAV on apache.org Store host attributes alongside offers to reduce number of lookups. Bugs closed: AURORA-913 Reviewed at https://reviews.apache.org/r/27902/ Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/b80e69c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/b80e69c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/b80e69c9 Branch: refs/heads/master Commit: b80e69c9b9d0e18b9ea1189fd310e6995e4e1fe9 Parents: 2e2c4b3 Author: Bill Farner Authored: Wed Nov 12 16:10:03 2014 -0800 Committer: Bill Farner Committed: Wed Nov 12 16:10:03 2014 -0800 ---------------------------------------------------------------------- .../org/apache/aurora/scheduler/HostOffer.java | 66 ++++ .../apache/aurora/scheduler/TaskLauncher.java | 3 +- .../aurora/scheduler/UserTaskLauncher.java | 3 +- .../scheduler/async/GcExecutorLauncher.java | 18 +- .../aurora/scheduler/async/OfferQueue.java | 125 ++----- .../aurora/scheduler/async/Preemptor.java | 67 ++-- .../async/RandomJitterReturnDelay.java | 4 +- .../aurora/scheduler/async/TaskScheduler.java | 10 +- .../events/NotifyingSchedulingFilter.java | 7 +- .../scheduler/filter/SchedulingFilter.java | 7 +- .../scheduler/filter/SchedulingFilterImpl.java | 86 ++--- .../apache/aurora/scheduler/http/Offers.java | 4 +- .../scheduler/mesos/MesosSchedulerImpl.java | 44 ++- .../scheduler/state/MaintenanceController.java | 1 + .../aurora/scheduler/state/TaskAssigner.java | 19 +- .../scheduler/stats/AsyncStatsModule.java | 9 +- .../aurora/scheduler/UserTaskLauncherTest.java | 13 +- .../scheduler/async/GcExecutorLauncherTest.java | 24 +- .../scheduler/async/OfferQueueImplTest.java | 136 +++----- .../scheduler/async/PreemptorImplTest.java | 53 +-- .../scheduler/async/TaskSchedulerImplTest.java | 9 +- .../scheduler/async/TaskSchedulerTest.java | 107 +++--- .../events/NotifyingSchedulingFilterTest.java | 14 +- .../filter/SchedulingFilterImplTest.java | 335 +++++++++---------- .../scheduler/mesos/MesosSchedulerImplTest.java | 64 ++-- .../scheduler/state/TaskAssignerImplTest.java | 35 +- 26 files changed, 625 insertions(+), 638 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/HostOffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/HostOffer.java b/src/main/java/org/apache/aurora/scheduler/HostOffer.java new file mode 100644 index 0000000..5056b60 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/HostOffer.java @@ -0,0 +1,66 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler; + +import java.util.Objects; + +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; + +import static java.util.Objects.requireNonNull; + +import static org.apache.mesos.Protos.Offer; + +/** + * An available resource in the cluster. + */ +public class HostOffer { + private final Offer offer; + private final IHostAttributes hostAttributes; + + public HostOffer(Offer offer, IHostAttributes hostAttributes) { + this.offer = requireNonNull(offer); + this.hostAttributes = requireNonNull(hostAttributes); + } + + public Offer getOffer() { + return offer; + } + + public IHostAttributes getAttributes() { + return hostAttributes; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof HostOffer)) { + return false; + } + HostOffer other = (HostOffer) o; + return Objects.equals(offer, other.offer) + && Objects.equals(hostAttributes, other.hostAttributes); + } + + @Override + public int hashCode() { + return Objects.hash(offer, hostAttributes); + } + + @Override + public String toString() { + return com.google.common.base.Objects.toStringHelper(this) + .add("offer", offer) + .add("hostAttributes", hostAttributes) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java index c13520a..cd55a6e 100644 --- a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java +++ b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java @@ -13,7 +13,6 @@ */ package org.apache.aurora.scheduler; -import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.TaskStatus; @@ -33,7 +32,7 @@ public interface TaskLauncher { * @return {@code false} if the launcher will not act on the offer, or {@code true} if the * launcher may accept the offer at some point in the future. */ - boolean willUse(Offer offer); + boolean willUse(HostOffer offer); /** * Informs the launcher that a status update has been received for a task. If the task is not http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java index 250c2df..e1b7d05 100644 --- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java +++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java @@ -27,7 +27,6 @@ import org.apache.aurora.scheduler.async.OfferQueue; import org.apache.aurora.scheduler.base.Conversions; import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.state.StateManager; -import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.TaskStatus; @@ -56,7 +55,7 @@ class UserTaskLauncher implements TaskLauncher { } @Override - public boolean willUse(Offer offer) { + public boolean willUse(HostOffer offer) { requireNonNull(offer); offerQueue.addOffer(offer); http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java index 79d8d8d..e02921d 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java +++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java @@ -41,6 +41,7 @@ import org.apache.aurora.Protobufs; import org.apache.aurora.codec.ThriftBinaryCodec; import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; import org.apache.aurora.gen.comm.AdjustRetainedTasks; +import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.TaskLauncher; import org.apache.aurora.scheduler.base.CommandUtil; import org.apache.aurora.scheduler.base.Query; @@ -52,7 +53,6 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.mesos.Protos; import org.apache.mesos.Protos.ExecutorID; import org.apache.mesos.Protos.ExecutorInfo; -import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.SlaveID; import org.apache.mesos.Protos.TaskID; @@ -192,20 +192,22 @@ public class GcExecutorLauncher implements TaskLauncher { Maps.transformValues(Tasks.mapById(tasksOnHost), Tasks.GET_STATUS))); } - private boolean sufficientResources(Offer offer) { - boolean sufficient = Resources.from(offer).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES); + private boolean sufficientResources(HostOffer offer) { + boolean sufficient = + Resources.from(offer.getOffer()).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES); if (!sufficient) { - LOG.warning("Offer for host " + offer.getHostname() + " is too small for a GC executor"); + LOG.warning("Offer for host " + offer.getOffer().getHostname() + + " is too small for a GC executor"); insufficientOffers.incrementAndGet(); } return sufficient; } @Override - public boolean willUse(final Offer offer) { + public boolean willUse(final HostOffer offer) { if (!settings.getGcExecutorPath().isPresent() || !sufficientResources(offer) - || !isTimeToCollect(offer.getHostname())) { + || !isTimeToCollect(offer.getOffer().getHostname())) { return false; } @@ -213,7 +215,9 @@ public class GcExecutorLauncher implements TaskLauncher { executor.execute(new Runnable() { @Override public void run() { - driver.launchTask(offer.getId(), makeGcTask(offer.getHostname(), offer.getSlaveId())); + driver.launchTask( + offer.getOffer().getId(), + makeGcTask(offer.getOffer().getHostname(), offer.getOffer().getSlaveId())); } }); offersConsumed.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java index dd8a900..d2682cd 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java +++ b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java @@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async; import java.util.Comparator; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ScheduledExecutorService; @@ -37,11 +36,11 @@ import com.twitter.common.quantity.Time; import com.twitter.common.stats.Stats; import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.state.MaintenanceController; -import org.apache.mesos.Protos.Offer; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.SlaveID; import org.apache.mesos.Protos.TaskInfo; @@ -64,7 +63,7 @@ public interface OfferQueue extends EventSubscriber { * * @param offer Newly-available resource offer. */ - void addOffer(Offer offer); + void addOffer(HostOffer offer); /** * Invalidates an offer. This indicates that the scheduler should not attempt to match any @@ -104,7 +103,7 @@ public interface OfferQueue extends EventSubscriber { * The delay is calculated for each offer that is received, so the return delay may be * fixed or variable. */ - interface OfferReturnDelay extends Supplier> { + interface OfferReturnDelay extends Supplier> { } /** @@ -120,51 +119,6 @@ public interface OfferQueue extends EventSubscriber { } } - /** - * Encapsulate an offer from a host, and the host's maintenance mode. - */ - class HostOffer { - private final Offer offer; - - // TODO(wfarner): Replace this with HostAttributes for more use of this caching. - private final MaintenanceMode mode; - - public HostOffer(Offer offer, MaintenanceMode mode) { - this.offer = requireNonNull(offer); - this.mode = requireNonNull(mode); - } - - public Offer getOffer() { - return offer; - } - - public MaintenanceMode getMode() { - return mode; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof HostOffer)) { - return false; - } - HostOffer other = (HostOffer) o; - return Objects.equals(offer, other.offer) && mode == other.mode; - } - - @Override - public int hashCode() { - return Objects.hash(offer, mode); - } - - @Override - public String toString() { - return com.google.common.base.Objects.toStringHelper(this) - .add("offer", offer) - .add("mode", mode) - .toString(); - } - } - class OfferQueueImpl implements OfferQueue { private static final Logger LOG = Logger.getLogger(OfferQueueImpl.class.getName()); @@ -174,41 +128,36 @@ public interface OfferQueue extends EventSubscriber { private final Driver driver; private final OfferReturnDelay returnDelay; private final ScheduledExecutorService executor; - private final MaintenanceController maintenance; @Inject - OfferQueueImpl(Driver driver, - OfferReturnDelay returnDelay, - ScheduledExecutorService executor, - MaintenanceController maintenance) { - - this.driver = driver; - this.returnDelay = returnDelay; - this.executor = executor; - this.maintenance = maintenance; + OfferQueueImpl(Driver driver, OfferReturnDelay returnDelay, ScheduledExecutorService executor) { + this.driver = requireNonNull(driver); + this.returnDelay = requireNonNull(returnDelay); + this.executor = requireNonNull(executor); } @Override - public void addOffer(final Offer offer) { + public void addOffer(final HostOffer offer) { // We run a slight risk of a race here, which is acceptable. The worst case is that we // temporarily hold two offers for the same host, which should be corrected when we return // them after the return delay. // There's also a chance that we return an offer for compaction ~simultaneously with the // same-host offer being canceled/returned. This is also fine. - Optional sameSlave = hostOffers.get(offer.getSlaveId()); + Optional sameSlave = hostOffers.get(offer.getOffer().getSlaveId()); if (sameSlave.isPresent()) { // If there are existing offers for the slave, decline all of them so the master can // compact all of those offers into a single offer and send them back. - LOG.info("Returning offers for " + offer.getSlaveId().getValue() + " for compaction."); - decline(offer.getId()); - removeAndDecline(sameSlave.get().offer.getId()); + LOG.info("Returning offers for " + offer.getOffer().getSlaveId().getValue() + + " for compaction."); + decline(offer.getOffer().getId()); + removeAndDecline(sameSlave.get().getOffer().getId()); } else { - hostOffers.add(new HostOffer(offer, maintenance.getMode(offer.getHostname()))); + hostOffers.add(offer); executor.schedule( new Runnable() { @Override public void run() { - removeAndDecline(offer.getId()); + removeAndDecline(offer.getOffer().getId()); } }, returnDelay.get().as(Time.MILLISECONDS), @@ -252,7 +201,7 @@ public interface OfferQueue extends EventSubscriber { */ @Subscribe public void hostAttributesChanged(HostAttributesChanged change) { - hostOffers.updateHostMode(change.getAttributes().getHost(), change.getAttributes().getMode()); + hostOffers.updateHostAttributes(change.getAttributes()); } /** @@ -280,12 +229,12 @@ public interface OfferQueue extends EventSubscriber { .onResultOf(new Function() { @Override public MaintenanceMode apply(HostOffer offer) { - return offer.mode; + return offer.getAttributes().getMode(); } }) .compound(Ordering.arbitrary()); - private final Set hostOffers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR); + private final Set offers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR); private final Map offersById = Maps.newHashMap(); private final Map offersBySlave = Maps.newHashMap(); private final Map offersByHost = Maps.newHashMap(); @@ -293,7 +242,7 @@ public interface OfferQueue extends EventSubscriber { HostOffers() { // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive. // Could track this separately if it turns out to pose problems. - Stats.exportSize("outstanding_offers", hostOffers); + Stats.exportSize("outstanding_offers", offers); } synchronized Optional get(SlaveID slaveId) { @@ -301,37 +250,37 @@ public interface OfferQueue extends EventSubscriber { } synchronized void add(HostOffer offer) { - hostOffers.add(offer); - offersById.put(offer.offer.getId(), offer); - offersBySlave.put(offer.offer.getSlaveId(), offer); - offersByHost.put(offer.offer.getHostname(), offer); + offers.add(offer); + offersById.put(offer.getOffer().getId(), offer); + offersBySlave.put(offer.getOffer().getSlaveId(), offer); + offersByHost.put(offer.getOffer().getHostname(), offer); } synchronized boolean remove(OfferID id) { HostOffer removed = offersById.remove(id); if (removed != null) { - hostOffers.remove(removed); - offersBySlave.remove(removed.offer.getSlaveId()); - offersByHost.remove(removed.offer.getHostname()); + offers.remove(removed); + offersBySlave.remove(removed.getOffer().getSlaveId()); + offersByHost.remove(removed.getOffer().getHostname()); } return removed != null; } - synchronized void updateHostMode(String hostName, MaintenanceMode mode) { - HostOffer offer = offersByHost.remove(hostName); + synchronized void updateHostAttributes(IHostAttributes attributes) { + HostOffer offer = offersByHost.remove(attributes.getHost()); if (offer != null) { // Remove and re-add a host's offer to re-sort based on its new hostStatus - remove(offer.offer.getId()); - add(new HostOffer(offer.offer, mode)); + remove(offer.getOffer().getId()); + add(new HostOffer(offer.getOffer(), attributes)); } } synchronized Iterable getWeaklyConsistentOffers() { - return Iterables.unmodifiableIterable(hostOffers); + return Iterables.unmodifiableIterable(offers); } synchronized void clear() { - hostOffers.clear(); + offers.clear(); offersById.clear(); offersBySlave.clear(); offersByHost.clear(); @@ -345,17 +294,17 @@ public interface OfferQueue extends EventSubscriber { // It's important that this method is not called concurrently - doing so would open up the // possibility of a race between the same offers being accepted by different threads. - for (HostOffer hostOffer : hostOffers.getWeaklyConsistentOffers()) { - Optional assignment = acceptor.apply(hostOffer); + for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) { + Optional assignment = acceptor.apply(offer); if (assignment.isPresent()) { // Guard against an offer being removed after we grabbed it from the iterator. // If that happens, the offer will not exist in hostOffers, and we can immediately // send it back to LOST for quick reschedule. // Removing while iterating counts on the use of a weakly-consistent iterator being used, // which is a feature of ConcurrentSkipListSet. - if (hostOffers.remove(hostOffer.offer.getId())) { + if (hostOffers.remove(offer.getOffer().getId())) { try { - driver.launchTask(hostOffer.offer.getId(), assignment.get()); + driver.launchTask(offer.getOffer().getId(), assignment.get()); return true; } catch (IllegalStateException e) { // TODO(William Farner): Catch only the checked exception produced by Driver http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java index a17738e..1d337f6 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java +++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java @@ -40,19 +40,20 @@ import com.google.common.collect.Sets; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import com.twitter.common.stats.Stats; +import com.twitter.common.stats.StatsProvider; import com.twitter.common.util.Clock; -import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.ResourceSlot; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter; -import org.apache.aurora.scheduler.state.MaintenanceController; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import static java.lang.annotation.ElementType.FIELD; @@ -63,8 +64,9 @@ import static java.util.Objects.requireNonNull; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING; -import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer; import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED; +import static org.apache.aurora.scheduler.storage.Storage.StoreProvider; +import static org.apache.aurora.scheduler.storage.Storage.Work; /** * Preempts active tasks in favor of higher priority tasks. @@ -130,7 +132,7 @@ public interface Preemptor { private final SchedulingFilter schedulingFilter; private final Amount preemptionCandidacyDelay; private final Clock clock; - private final MaintenanceController maintenance; + private final AtomicLong missingAttributes; /** * Creates a new preemptor. @@ -151,7 +153,7 @@ public interface Preemptor { SchedulingFilter schedulingFilter, @PreemptionDelay Amount preemptionCandidacyDelay, Clock clock, - MaintenanceController maintenance) { + StatsProvider statsProvider) { this.storage = requireNonNull(storage); this.stateManager = requireNonNull(stateManager); @@ -159,7 +161,7 @@ public interface Preemptor { this.schedulingFilter = requireNonNull(schedulingFilter); this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay); this.clock = requireNonNull(clock); - this.maintenance = requireNonNull(maintenance); + missingAttributes = statsProvider.makeCounter("preemptor_missing_attributes"); } private List fetch(Query.Builder query, Predicate filter) { @@ -200,24 +202,24 @@ public interface Preemptor { private static final Function OFFER_TO_RESOURCE_SLOT = new Function() { @Override - public ResourceSlot apply(HostOffer hostOffer) { - return ResourceSlot.from(hostOffer.getOffer()); + public ResourceSlot apply(HostOffer offer) { + return ResourceSlot.from(offer.getOffer()); } }; private static final Function OFFER_TO_HOST = new Function() { @Override - public String apply(HostOffer hostOffer) { - return hostOffer.getOffer().getHostname(); + public String apply(HostOffer offer) { + return offer.getOffer().getHostname(); } }; - private static final Function OFFER_TO_MODE = - new Function() { + private static final Function OFFER_TO_ATTRIBUTES = + new Function() { @Override - public MaintenanceMode apply(HostOffer hostOffer) { - return hostOffer.getMode(); + public IHostAttributes apply(HostOffer offer) { + return offer.getAttributes(); } }; @@ -255,18 +257,17 @@ public interface Preemptor { // us. return Optional.absent(); } - MaintenanceMode mode = - Iterables.getOnlyElement(FluentIterable.from(offers).transform(OFFER_TO_MODE).toSet()); + IHostAttributes attributes = Iterables.getOnlyElement( + FluentIterable.from(offers).transform(OFFER_TO_ATTRIBUTES).toSet()); - Set vetos = schedulingFilter.filter( + Set vetoes = schedulingFilter.filter( slackResources, - host, - mode, + attributes, pendingTask.getTask(), pendingTask.getTaskId(), attributeAggregate); - if (vetos.isEmpty()) { + if (vetoes.isEmpty()) { return Optional.>of(ImmutableSet.of()); } } @@ -289,26 +290,40 @@ public interface Preemptor { ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)), slackResources); - Set vetos = schedulingFilter.filter( + Optional attributes = getHostAttributes(host); + if (!attributes.isPresent()) { + missingAttributes.incrementAndGet(); + continue; + } + + Set vetoes = schedulingFilter.filter( totalResource, - host, - maintenance.getMode(host), + attributes.get(), pendingTask.getTask(), pendingTask.getTaskId(), attributeAggregate); - if (vetos.isEmpty()) { + if (vetoes.isEmpty()) { return Optional.>of(ImmutableSet.copyOf(toPreemptTasks)); } } return Optional.absent(); } + private Optional getHostAttributes(final String host) { + return storage.weaklyConsistentRead(new Work.Quiet>() { + @Override + public Optional apply(StoreProvider storeProvider) { + return storeProvider.getAttributeStore().getHostAttributes(host); + } + }); + } + private static final Function OFFER_TO_SLAVE_ID = new Function() { @Override - public String apply(HostOffer hostOffer) { - return hostOffer.getOffer().getSlaveId().getValue(); + public String apply(HostOffer offer) { + return offer.getOffer().getSlaveId().getValue(); } }; http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java b/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java index d15d9e6..2accb4e 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java +++ b/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java @@ -43,7 +43,7 @@ class RandomJitterReturnDelay implements OfferReturnDelay { } @Override - public Amount get() { - return Amount.of(minHoldTimeMs + random.nextInt(maxJitterWindowMs), Time.MILLISECONDS); + public Amount get() { + return Amount.of((long) minHoldTimeMs + random.nextInt(maxJitterWindowMs), Time.MILLISECONDS); } } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java index b23457e..e2ba8b8 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java +++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java @@ -41,6 +41,7 @@ import com.twitter.common.stats.Stats; import com.twitter.common.stats.StatsProvider; import com.twitter.common.util.Clock; +import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; @@ -65,7 +66,6 @@ import static java.util.Objects.requireNonNull; import static org.apache.aurora.gen.ScheduleStatus.LOST; import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer; /** * Enables scheduling and preemption of tasks. @@ -134,20 +134,20 @@ public interface TaskScheduler extends EventSubscriber { return new Function>() { @Override - public Optional apply(HostOffer hostOffer) { + public Optional apply(HostOffer offer) { Optional reservedTaskId = - reservations.getSlaveReservation(hostOffer.getOffer().getSlaveId()); + reservations.getSlaveReservation(offer.getOffer().getSlaveId()); if (reservedTaskId.isPresent()) { if (taskId.equals(reservedTaskId.get())) { // Slave is reserved to satisfy this task. - return assigner.maybeAssign(hostOffer, task, attributeAggregate); + return assigner.maybeAssign(offer, task, attributeAggregate); } else { // Slave is reserved for another task. return Optional.absent(); } } else { // Slave is not reserved. - return assigner.maybeAssign(hostOffer, task, attributeAggregate); + return assigner.maybeAssign(offer, task, attributeAggregate); } } }; http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java index fc17cac..ca53303 100644 --- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java @@ -20,11 +20,11 @@ import java.util.Set; import javax.inject.Inject; import javax.inject.Qualifier; -import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.scheduler.ResourceSlot; import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import static java.lang.annotation.ElementType.FIELD; @@ -60,13 +60,12 @@ class NotifyingSchedulingFilter implements SchedulingFilter { @Override public Set filter( ResourceSlot offer, - String slaveHost, - MaintenanceMode mode, + IHostAttributes hostAttributes, ITaskConfig task, String taskId, AttributeAggregate jobState) { - Set vetoes = delegate.filter(offer, slaveHost, mode, task, taskId, jobState); + Set vetoes = delegate.filter(offer, hostAttributes, task, taskId, jobState); if (!vetoes.isEmpty()) { eventSink.post(new Vetoed(taskId, vetoes)); } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java index c37272c..c1c5f26 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java @@ -18,8 +18,8 @@ import java.util.Set; import com.google.common.annotations.VisibleForTesting; -import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.scheduler.ResourceSlot; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; /** @@ -105,8 +105,6 @@ public interface SchedulingFilter { * Applies a task against the filter with the given resources, and on the host. * * @param offer Resources offered. - * @param slaveHost Host that the resources are associated with. - * @param mode Maintenance mode of the host that the resources are associated with. * @param task Task. * @param taskId Canonical ID of the task. * @param attributeAggregate Attribute information for tasks in the job containing {@code task}. @@ -115,8 +113,7 @@ public interface SchedulingFilter { */ Set filter( ResourceSlot offer, - String slaveHost, - MaintenanceMode mode, + IHostAttributes attributes, ITaskConfig task, String taskId, AttributeAggregate attributeAggregate); http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java index 0533baa..cc6b53b 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java @@ -17,8 +17,6 @@ import java.util.Comparator; import java.util.EnumSet; import java.util.Set; -import javax.inject.Inject; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; @@ -33,16 +31,10 @@ import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.TaskConstraint; import org.apache.aurora.scheduler.ResourceSlot; import org.apache.aurora.scheduler.configuration.ConfigurationManager; -import org.apache.aurora.scheduler.storage.AttributeStore; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.Storage.Work.Quiet; -import org.apache.aurora.scheduler.storage.entities.IAttribute; import org.apache.aurora.scheduler.storage.entities.IConstraint; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import static java.util.Objects.requireNonNull; - import static org.apache.aurora.gen.MaintenanceMode.DRAINED; import static org.apache.aurora.gen.MaintenanceMode.DRAINING; import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE; @@ -65,18 +57,6 @@ public class SchedulingFilterImpl implements SchedulingFilter { private static final Set VETO_MODES = EnumSet.of(DRAINING, DRAINED); - private final Storage storage; - - /** - * Creates a new scheduling filter. - * - * @param storage Interface to accessing the task store. - */ - @Inject - public SchedulingFilterImpl(Storage storage) { - this.storage = requireNonNull(storage); - } - /** * A function that may veto a task. */ @@ -187,7 +167,7 @@ public class SchedulingFilterImpl implements SchedulingFilter { private FilterRule getConstraintFilter( final AttributeAggregate jobState, - final String slaveHost) { + final IHostAttributes offerAttributes) { return new FilterRule() { @Override @@ -196,32 +176,23 @@ public class SchedulingFilterImpl implements SchedulingFilter { return ImmutableList.of(); } - // In the interest of performance, we perform a weakly consistent read here. The biggest - // risk of this is that we might schedule against stale host attributes, or we might fail - // to correctly satisfy a diversity constraint. Given that the likelihood is relatively low - // for both of these, and the impact is also low, the weak consistency is acceptable. - return storage.weaklyConsistentRead(new Quiet>() { - @Override - public Iterable apply(final StoreProvider storeProvider) { - ConstraintFilter constraintFilter = new ConstraintFilter( - jobState, - AttributeStore.Util.attributesOrNone(storeProvider, slaveHost)); - ImmutableList.Builder vetoes = ImmutableList.builder(); - for (IConstraint constraint : VALUES_FIRST.sortedCopy(task.getConstraints())) { - Optional veto = constraintFilter.getVeto(constraint); - if (veto.isPresent()) { - vetoes.add(veto.get()); - if (isValueConstraint(constraint)) { - // Break when a value constraint mismatch is found to avoid other - // potentially-expensive operations to satisfy other constraints. - break; - } - } + ConstraintFilter constraintFilter = new ConstraintFilter( + jobState, + offerAttributes.getAttributes()); + ImmutableList.Builder vetoes = ImmutableList.builder(); + for (IConstraint constraint : VALUES_FIRST.sortedCopy(task.getConstraints())) { + Optional veto = constraintFilter.getVeto(constraint); + if (veto.isPresent()) { + vetoes.add(veto.get()); + if (isValueConstraint(constraint)) { + // Break when a value constraint mismatch is found to avoid other + // potentially-expensive operations to satisfy other constraints. + break; } - - return vetoes.build(); } - }); + } + + return vetoes.build(); } }; } @@ -240,38 +211,31 @@ public class SchedulingFilterImpl implements SchedulingFilter { return builder.build(); } - private boolean isDedicated(final String slaveHost) { - Iterable slaveAttributes = - storage.weaklyConsistentRead(new Quiet>() { - @Override - public Iterable apply(final StoreProvider storeProvider) { - return AttributeStore.Util.attributesOrNone(storeProvider, slaveHost); - } - }); - - return Iterables.any(slaveAttributes, new ConstraintFilter.NameFilter(DEDICATED_ATTRIBUTE)); + private boolean isDedicated(IHostAttributes attributes) { + return Iterables.any( + attributes.getAttributes(), + new ConstraintFilter.NameFilter(DEDICATED_ATTRIBUTE)); } @Override public Set filter( ResourceSlot offer, - String slaveHost, - MaintenanceMode mode, + IHostAttributes attributes, ITaskConfig task, String taskId, AttributeAggregate attributeAggregate) { - if (!ConfigurationManager.isDedicated(task) && isDedicated(slaveHost)) { + if (!ConfigurationManager.isDedicated(task) && isDedicated(attributes)) { return ImmutableSet.of(DEDICATED_HOST_VETO); } - Optional maintenanceVeto = getMaintenanceVeto(mode); + Optional maintenanceVeto = getMaintenanceVeto(attributes.getMode()); if (maintenanceVeto.isPresent()) { return maintenanceVeto.asSet(); } return ImmutableSet.builder() - .addAll(getConstraintFilter(attributeAggregate, slaveHost).apply(task)) + .addAll(getConstraintFilter(attributeAggregate, attributes).apply(task)) .addAll(getResourceVetoes(offer, task)) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/http/Offers.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java index 446dc74..6d75c3a 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java +++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java @@ -27,14 +27,14 @@ import com.google.common.base.Function; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMap; +import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.async.OfferQueue; import org.apache.mesos.Protos.Attribute; import org.apache.mesos.Protos.ExecutorID; -import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.Value.Range; -import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer; +import static org.apache.mesos.Protos.Offer; /** * Servlet that exposes resource offers that the scheduler is currently retaining. http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java index ffcbc97..ffc30bb 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java @@ -30,6 +30,7 @@ import com.twitter.common.inject.TimedInterceptor.Timed; import com.twitter.common.stats.Stats; import org.apache.aurora.GuiceUtils.AllowUnchecked; +import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.TaskLauncher; import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.events.EventSink; @@ -39,10 +40,10 @@ import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.mesos.Protos.ExecutorID; import org.apache.mesos.Protos.FrameworkID; import org.apache.mesos.Protos.MasterInfo; -import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.SlaveID; import org.apache.mesos.Protos.TaskStatus; @@ -55,6 +56,8 @@ import static java.lang.annotation.ElementType.PARAMETER; import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; +import static org.apache.mesos.Protos.Offer; + /** * Location for communication with mesos. */ @@ -90,7 +93,7 @@ class MesosSchedulerImpl implements Scheduler { * @param storage Store to save host attributes into. * @param lifecycle Application lifecycle manager. * @param taskLaunchers Task launchers, which will be used in order. Calls to - * {@link TaskLauncher#willUse(Offer)} and + * {@link TaskLauncher#willUse(HostOffer)} and * {@link TaskLauncher#statusUpdate(TaskStatus)} are propagated to provided * launchers, ceasing after the first match (based on a return value of * {@code true}. @@ -153,37 +156,30 @@ class MesosSchedulerImpl implements Scheduler { public void resourceOffers(SchedulerDriver driver, final List offers) { Preconditions.checkState(isRegistered, "Must be registered before receiving offers."); - // Store all host attributes in a single write operation to prevent other threads from - // securing the storage lock between saves. We also save the host attributes before passing - // offers elsewhere to ensure that host attributes are available before attempting to - // schedule tasks associated with offers. - // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over - // offers when the host attributes cannot be found. (AURORA-137) - executor.execute(new Runnable() { @Override public void run() { + // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over + // offers when the host attributes cannot be found. (AURORA-137) storage.write(new MutateWork.NoResult.Quiet() { @Override protected void execute(MutableStoreProvider storeProvider) { - for (final Offer offer : offers) { - storeProvider.getAttributeStore().saveHostAttributes( - AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer)); + for (Offer offer : offers) { + IHostAttributes attributes = + AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer); + storeProvider.getAttributeStore().saveHostAttributes(attributes); + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, String.format("Received offer: %s", offer)); + } + totalResourceOffers.incrementAndGet(); + for (TaskLauncher launcher : taskLaunchers) { + if (launcher.willUse(new HostOffer(offer, attributes))) { + break; + } + } } } }); - - for (Offer offer : offers) { - if (LOG.isLoggable(Level.FINE)) { - LOG.log(Level.FINE, String.format("Received offer: %s", offer)); - } - totalResourceOffers.incrementAndGet(); - for (TaskLauncher launcher : taskLaunchers) { - if (launcher.willUse(offer)) { - break; - } - } - } } }); } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java index 077699f..86440eb 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java +++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java @@ -52,6 +52,7 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING; * All state-changing functions return their results. Additionally, all state-changing functions * will ignore requests to change state of unknown hosts and subsequently omit these hosts from * return values. + * TODO(wfarner): Convert use of HostStatus in this API to IHostStatus (immutable). */ public interface MaintenanceController { http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java index 9c9b659..77db411 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java @@ -20,6 +20,7 @@ import javax.inject.Inject; import com.google.common.base.Optional; +import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.ResourceSlot; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.configuration.Resources; @@ -29,12 +30,11 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.mesos.MesosTaskFactory; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.TaskInfo; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer; +import static org.apache.mesos.Protos.Offer; /** * Responsible for matching a task against an offer. @@ -45,13 +45,13 @@ public interface TaskAssigner { * Tries to match a task against an offer. If a match is found, the assigner should * make the appropriate changes to the task and provide a non-empty result. * - * @param hostOffer The resource offer. + * @param offer The resource offer. * @param task The task to match against and optionally assign. * @param attributeAggregate Attribute information for tasks in the job containing {@code task}. * @return Instructions for launching the task if matching and assignment were successful. */ Optional maybeAssign( - HostOffer hostOffer, + HostOffer offer, IScheduledTask task, AttributeAggregate attributeAggregate); @@ -89,21 +89,20 @@ public interface TaskAssigner { @Override public Optional maybeAssign( - HostOffer hostOffer, + HostOffer offer, IScheduledTask task, AttributeAggregate attributeAggregate) { Set vetoes = filter.filter( - ResourceSlot.from(hostOffer.getOffer()), - hostOffer.getOffer().getHostname(), - hostOffer.getMode(), + ResourceSlot.from(offer.getOffer()), + offer.getAttributes(), task.getAssignedTask().getTask(), Tasks.id(task), attributeAggregate); if (vetoes.isEmpty()) { - return Optional.of(assign(hostOffer.getOffer(), task)); + return Optional.of(assign(offer.getOffer(), task)); } else { - LOG.fine("Slave " + hostOffer.getOffer().getHostname() + " vetoed task " + Tasks.id(task) + LOG.fine("Slave " + offer.getOffer().getHostname() + " vetoed task " + Tasks.id(task) + ": " + vetoes); return Optional.absent(); } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java index 844a38a..1c9904c 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java @@ -35,6 +35,7 @@ import com.twitter.common.quantity.Data; import com.twitter.common.quantity.Time; import org.apache.aurora.gen.ResourceAggregate; +import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.async.OfferQueue; import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.base.Conversions; @@ -49,8 +50,6 @@ import static java.lang.annotation.ElementType.PARAMETER; import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer; - /** * Module to configure export of cluster-wide resource allocation and consumption statistics. */ @@ -115,13 +114,13 @@ public class AsyncStatsModule extends AbstractModule { private static final Function TO_RESOURCE = new Function() { @Override - public MachineResource apply(HostOffer hostOffer) { - Resources resources = Resources.from(hostOffer.getOffer()); + public MachineResource apply(HostOffer offer) { + Resources resources = Resources.from(offer.getOffer()); IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate() .setNumCpus(resources.getNumCpus()) .setRamMb(resources.getRam().as(Data.MB)) .setDiskMb(resources.getDisk().as(Data.MB))); - return new MachineResource(quota, Conversions.isDedicated(hostOffer.getOffer())); + return new MachineResource(quota, Conversions.isDedicated(offer.getOffer())); } }; http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java index 083a635..4673e80 100644 --- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java +++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java @@ -22,14 +22,15 @@ import com.google.common.collect.Iterables; import com.twitter.common.collections.Pair; import com.twitter.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.async.OfferQueue; import org.apache.aurora.scheduler.configuration.Resources; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage.StorageException; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.mesos.Protos.Attribute; import org.apache.mesos.Protos.FrameworkID; -import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.SlaveID; @@ -47,6 +48,7 @@ import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.FAILED; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.scheduler.configuration.ConfigurationManager.HOST_CONSTRAINT; +import static org.apache.mesos.Protos.Offer; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertTrue; @@ -60,7 +62,7 @@ public class UserTaskLauncherTest extends EasyMockTest { private static final String TASK_ID_A = "task_id_a"; private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("OfferId").build(); - private static final Offer OFFER = createOffer(SLAVE_ID, SLAVE_HOST_1, 4, 1024, 1024); + private static final HostOffer OFFER = createOffer(SLAVE_ID, SLAVE_HOST_1, 4, 1024, 1024); private OfferQueue offerQueue; private StateManager stateManager; @@ -171,13 +173,13 @@ public class UserTaskLauncherTest extends EasyMockTest { launcher.statusUpdate(status); } - private static Offer createOffer(SlaveID slave, String slaveHost, double cpu, + private static HostOffer createOffer(SlaveID slave, String slaveHost, double cpu, double ramMb, double diskMb) { return createOffer(slave, slaveHost, cpu, ramMb, diskMb, ImmutableSet.>of()); } - private static Offer createOffer(SlaveID slave, String slaveHost, double cpu, + private static HostOffer createOffer(SlaveID slave, String slaveHost, double cpu, double ramMb, double diskMb, Set> ports) { Ranges portRanges = Ranges.newBuilder() @@ -189,7 +191,7 @@ public class UserTaskLauncherTest extends EasyMockTest { })) .build(); - return Offer.newBuilder() + Offer mesosOffer = Offer.newBuilder() .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(Resources.CPUS) .setScalar(Scalar.newBuilder().setValue(cpu))) .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(Resources.RAM_MB) @@ -206,5 +208,6 @@ public class UserTaskLauncherTest extends EasyMockTest { .setFrameworkId(FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build()) .setId(OFFER_ID) .build(); + return new HostOffer(mesosOffer, IHostAttributes.build(new HostAttributes())); } } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java index 758a8d4..422d5a9 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java @@ -30,20 +30,22 @@ import com.twitter.common.util.testing.FakeClock; import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.ExecutorConfig; +import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.Identity; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.comm.AdjustRetainedTasks; +import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.configuration.Resources; import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.mesos.Protos.FrameworkID; -import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.SlaveID; @@ -58,6 +60,7 @@ import static org.apache.aurora.gen.ScheduleStatus.FAILED; import static org.apache.aurora.scheduler.async.GcExecutorLauncher.INSUFFICIENT_OFFERS_STAT_NAME; import static org.apache.aurora.scheduler.async.GcExecutorLauncher.LOST_TASKS_STAT_NAME; import static org.apache.aurora.scheduler.async.GcExecutorLauncher.SYSTEM_TASK_PREFIX; +import static org.apache.mesos.Protos.Offer; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -67,13 +70,15 @@ public class GcExecutorLauncherTest extends EasyMockTest { private static final String HOST = "slave-host"; - private static final Offer OFFER = Offer.newBuilder() + private static final Offer MESOS_OFFER = Offer.newBuilder() .setSlaveId(SlaveID.newBuilder().setValue("slave-id")) .setHostname(HOST) .setFrameworkId(FrameworkID.newBuilder().setValue("framework-id").build()) .setId(OfferID.newBuilder().setValue("offer-id")) .addAllResources(GcExecutorLauncher.TOTAL_GC_EXECUTOR_RESOURCES.toResourceList()) .build(); + private static final HostOffer OFFER = + new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes())); private static final String JOB_A = "jobA"; private static final String TASK_UUID = "gc"; @@ -166,18 +171,19 @@ public class GcExecutorLauncherTest extends EasyMockTest { Resources.subtract( GcExecutorLauncher.TOTAL_GC_EXECUTOR_RESOURCES, GcExecutorLauncher.EPSILON).toResourceList(); - Offer smallOffer = OFFER.toBuilder() + Offer smallOffer = MESOS_OFFER.toBuilder() .clearResources() .addAllResources(resources) .build(); assertEquals(0, insufficientOffers.get()); - assertFalse(gcExecutorLauncher.willUse(smallOffer)); + assertFalse(gcExecutorLauncher.willUse( + new HostOffer(smallOffer, IHostAttributes.build(new HostAttributes())))); assertEquals(1, insufficientOffers.get()); } private static TaskStatus makeStatus(String taskId) { return TaskStatus.newBuilder() - .setSlaveId(OFFER.getSlaveId()) + .setSlaveId(OFFER.getOffer().getSlaveId()) .setState(TaskState.TASK_RUNNING) .setTaskId(TaskID.newBuilder().setValue(taskId)) .build(); @@ -220,8 +226,12 @@ public class GcExecutorLauncherTest extends EasyMockTest { Maps.transformValues(Tasks.mapById(ImmutableSet.copyOf(tasks)), Tasks.GET_STATUS); AdjustRetainedTasks message = new AdjustRetainedTasks().setRetainedTasks(statuses); TaskInfo task = GcExecutorLauncher.makeGcTask( - HOST, OFFER.getSlaveId(), SETTINGS.getGcExecutorPath().get(), TASK_UUID, message); - driver.launchTask(OFFER.getId(), task); + HOST, + OFFER.getOffer().getSlaveId(), + SETTINGS.getGcExecutorPath().get(), + TASK_UUID, + message); + driver.launchTask(OFFER.getOffer().getId(), task); } private IScheduledTask makeTask(String jobName, ScheduleStatus status) { http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java index e2a198a..4cf602a 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java @@ -13,154 +13,124 @@ */ package org.apache.aurora.scheduler.async; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.base.Throwables; import com.google.common.testing.TearDown; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.concurrent.ExecutorServiceShutdown; +import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.scheduler.async.OfferQueue.LaunchException; +import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl; import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.state.MaintenanceController; -import org.apache.mesos.Protos.Offer; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; import org.apache.mesos.Protos.TaskInfo; -import org.easymock.IAnswer; import org.junit.Before; import org.junit.Test; -import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer; +import static org.apache.aurora.gen.MaintenanceMode.DRAINING; +import static org.apache.aurora.gen.MaintenanceMode.NONE; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class OfferQueueImplTest extends EasyMockTest { - private static final Amount RETURN_DELAY = Amount.of(1, Time.DAYS); + private static final Amount RETURN_DELAY = Amount.of(1L, Time.DAYS); private static final String HOST_A = "HOST_A"; - private static final Offer OFFER_A = Offers.makeOffer("OFFER_A", HOST_A); + private static final HostOffer OFFER_A = new HostOffer( + Offers.makeOffer("OFFER_A", HOST_A), + IHostAttributes.build(new HostAttributes().setMode(NONE))); private static final String HOST_B = "HOST_B"; - private static final Offer OFFER_B = Offers.makeOffer("OFFER_B", HOST_B); + private static final HostOffer OFFER_B = new HostOffer( + Offers.makeOffer("OFFER_B", HOST_B), + IHostAttributes.build(new HostAttributes().setMode(NONE))); private static final String HOST_C = "HOST_C"; - private static final Offer OFFER_C = Offers.makeOffer("OFFER_C", HOST_C); + private static final HostOffer OFFER_C = new HostOffer( + Offers.makeOffer("OFFER_C", HOST_C), + IHostAttributes.build(new HostAttributes().setMode(NONE))); private Driver driver; - private ScheduledExecutorService executor; - private ExecutorService testExecutor; - private MaintenanceController maintenanceController; + private FakeScheduledExecutor clock; private Function> offerAcceptor; private OfferQueueImpl offerQueue; @Before public void setUp() { driver = createMock(Driver.class); - ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build(); - executor = Executors.newSingleThreadScheduledExecutor(threadFactory); - testExecutor = Executors.newCachedThreadPool(threadFactory); + ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class); + clock = FakeScheduledExecutor.scheduleExecutor(executorMock); + addTearDown(new TearDown() { @Override public void tearDown() throws Exception { - new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute(); - new ExecutorServiceShutdown(testExecutor, Amount.of(1L, Time.SECONDS)).execute(); + clock.assertEmpty(); } }); - maintenanceController = createMock(MaintenanceController.class); offerAcceptor = createMock(new Clazz>>() { }); OfferReturnDelay returnDelay = new OfferReturnDelay() { @Override - public Amount get() { + public Amount get() { return RETURN_DELAY; } }; - offerQueue = new OfferQueueImpl(driver, returnDelay, executor, maintenanceController); + offerQueue = new OfferQueueImpl(driver, returnDelay, executorMock); } @Test - public void testNoDeadlock() throws Exception { - // Test that a blocked call to maintenanceController does not result in a deadlock between - // the intrinsic lock and the storage lock. - final CountDownLatch launchAttempted = new CountDownLatch(1); - expect(maintenanceController.getMode(HOST_A)).andAnswer(new IAnswer() { - @Override - public MaintenanceMode answer() throws InterruptedException { - launchAttempted.await(); - return MaintenanceMode.NONE; - } - }); - - control.replay(); + public void testOffersSorted() throws Exception { + // Ensures that non-DRAINING offers are preferred - the DRAINING offer would be tried last. - final CountDownLatch offerAdded = new CountDownLatch(1); - testExecutor.submit(new Runnable() { - @Override - public void run() { - offerQueue.addOffer(OFFER_A); - offerAdded.countDown(); - } - }); - testExecutor.submit(new Runnable() { - @Override - public void run() { - try { - offerQueue.launchFirst(offerAcceptor); - launchAttempted.countDown(); - } catch (LaunchException e) { - throw Throwables.propagate(e); - } - } - }); + HostOffer offerA = setMode(OFFER_A, DRAINING); + HostOffer offerC = setMode(OFFER_C, DRAINING); - launchAttempted.await(); - offerAdded.await(); - } + TaskInfo task = TaskInfo.getDefaultInstance(); + expect(offerAcceptor.apply(OFFER_B)).andReturn(Optional.of(task)); + driver.launchTask(OFFER_B.getOffer().getId(), task); - @Test - public void testOffersSorted() throws Exception { - MaintenanceMode modeA = MaintenanceMode.NONE; - MaintenanceMode modeB = MaintenanceMode.DRAINING; - MaintenanceMode modeC = MaintenanceMode.NONE; + driver.declineOffer(offerA.getOffer().getId()); + driver.declineOffer(offerC.getOffer().getId()); - HostOffer hostOfferA = new HostOffer(OFFER_A, modeA); - HostOffer hostOfferB = new HostOffer(OFFER_B, modeB); - HostOffer hostOfferC = new HostOffer(OFFER_C, modeC); + control.replay(); - expect(maintenanceController.getMode(HOST_A)).andReturn(modeA); - expect(maintenanceController.getMode(HOST_B)).andReturn(modeB); - expect(maintenanceController.getMode(HOST_C)).andReturn(modeC); - expect(offerAcceptor.apply(hostOfferA)).andReturn(Optional.absent()); - expect(offerAcceptor.apply(hostOfferB)).andReturn(Optional.absent()); - expect(offerAcceptor.apply(hostOfferC)).andReturn(Optional.absent()); + offerQueue.addOffer(offerA); + offerQueue.addOffer(OFFER_B); + offerQueue.addOffer(offerC); + assertTrue(offerQueue.launchFirst(offerAcceptor)); + clock.advance(RETURN_DELAY); + } + @Test + public void testFlushOffers() throws Exception { control.replay(); offerQueue.addOffer(OFFER_A); offerQueue.addOffer(OFFER_B); - offerQueue.addOffer(OFFER_C); + offerQueue.driverDisconnected(new DriverDisconnected()); assertFalse(offerQueue.launchFirst(offerAcceptor)); + clock.advance(RETURN_DELAY); } @Test - public void testFlushOffers() throws Exception { - expect(maintenanceController.getMode(HOST_A)).andReturn(MaintenanceMode.NONE); - expect(maintenanceController.getMode(HOST_B)).andReturn(MaintenanceMode.NONE); + public void testDeclineOffer() throws Exception { + driver.declineOffer(OFFER_A.getOffer().getId()); control.replay(); offerQueue.addOffer(OFFER_A); - offerQueue.addOffer(OFFER_B); - offerQueue.driverDisconnected(new DriverDisconnected()); - assertFalse(offerQueue.launchFirst(offerAcceptor)); + clock.advance(RETURN_DELAY); + } + + private static HostOffer setMode(HostOffer offer, MaintenanceMode mode) { + return new HostOffer( + offer.getOffer(), + IHostAttributes.build(offer.getAttributes().newBuilder().setMode(mode))); } } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java index c0fa462..59bfbcb 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.Lists; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Data; import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.testing.FakeClock; @@ -42,6 +43,7 @@ import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.ResourceSlot; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; @@ -49,7 +51,6 @@ import org.apache.aurora.scheduler.configuration.Resources; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; -import org.apache.aurora.scheduler.state.MaintenanceController; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; @@ -60,6 +61,7 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.storage.mem.MemStorage; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.easymock.EasyMock; import org.easymock.IAnswer; import org.easymock.IExpectationSetters; @@ -70,7 +72,6 @@ import static org.apache.aurora.gen.MaintenanceMode.NONE; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; -import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer; import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl; import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import static org.apache.mesos.Protos.Offer; @@ -103,7 +104,7 @@ public class PreemptorImplTest extends EasyMockTest { private StateManager stateManager; private SchedulingFilter schedulingFilter; private FakeClock clock; - private MaintenanceController maintenance; + private StatsProvider statsProvider; private OfferQueue offerQueue; private AttributeAggregate emptyJob; @@ -112,8 +113,8 @@ public class PreemptorImplTest extends EasyMockTest { storageUtil = new StorageTestUtil(this); storageUtil.expectOperations(); stateManager = createMock(StateManager.class); - maintenance = createMock(MaintenanceController.class); clock = new FakeClock(); + statsProvider = new FakeStatsProvider(); offerQueue = createMock(OfferQueue.class); emptyJob = new AttributeAggregate( Suppliers.ofInstance(ImmutableSet.of()), @@ -128,7 +129,7 @@ public class PreemptorImplTest extends EasyMockTest { schedulingFilter, PREEMPTION_DELAY, clock, - maintenance); + statsProvider); preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), emptyJob); } @@ -150,12 +151,10 @@ public class PreemptorImplTest extends EasyMockTest { IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks))); } - private void expectGetMaintenance(String host) { - expect(maintenance.getMode(host)).andReturn(MaintenanceMode.NONE); - } - @Test public void testPreempted() throws Exception { + setUpHost(HOST_A, RACK_A); + schedulingFilter = createMock(SchedulingFilter.class); ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A); runOnHost(lowPriority, HOST_A); @@ -168,8 +167,6 @@ public class PreemptorImplTest extends EasyMockTest { expectGetPendingTasks(highPriority); expectGetActiveTasks(lowPriority); - expectGetMaintenance(HOST_A); - expectFiltering(); expectPreempted(lowPriority); @@ -179,6 +176,8 @@ public class PreemptorImplTest extends EasyMockTest { @Test public void testLowestPriorityPreempted() throws Exception { + setUpHost(HOST_A, RACK_A); + schedulingFilter = createMock(SchedulingFilter.class); ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10); runOnHost(lowPriority, HOST_A); @@ -194,7 +193,6 @@ public class PreemptorImplTest extends EasyMockTest { expectGetPendingTasks(highPriority); expectGetActiveTasks(lowerPriority, lowerPriority); - expectGetMaintenance(HOST_A); expectFiltering(); expectPreempted(lowerPriority); @@ -204,6 +202,8 @@ public class PreemptorImplTest extends EasyMockTest { @Test public void testOnePreemptableTask() throws Exception { + setUpHost(HOST_A, RACK_A); + schedulingFilter = createMock(SchedulingFilter.class); ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100); runOnHost(highPriority, HOST_A); @@ -222,7 +222,6 @@ public class PreemptorImplTest extends EasyMockTest { expectGetPendingTasks(pendingPriority); expectGetActiveTasks(highPriority, lowerPriority, lowestPriority); - expectGetMaintenance(HOST_A); expectFiltering(); expectPreempted(lowestPriority); @@ -250,6 +249,8 @@ public class PreemptorImplTest extends EasyMockTest { @Test public void testProductionPreemptingNonproduction() throws Exception { + setUpHost(HOST_A, RACK_A); + schedulingFilter = createMock(SchedulingFilter.class); // Use a very low priority for the production task to show that priority is irrelevant. ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000); @@ -263,7 +264,6 @@ public class PreemptorImplTest extends EasyMockTest { expectGetPendingTasks(p1); expectGetActiveTasks(a1); - expectGetMaintenance(HOST_A); expectFiltering(); expectPreempted(a1); @@ -273,6 +273,8 @@ public class PreemptorImplTest extends EasyMockTest { @Test public void testProductionPreemptingNonproductionAcrossUsers() throws Exception { + setUpHost(HOST_A, RACK_A); + schedulingFilter = createMock(SchedulingFilter.class); // Use a very low priority for the production task to show that priority is irrelevant. ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000); @@ -286,7 +288,6 @@ public class PreemptorImplTest extends EasyMockTest { expectGetPendingTasks(p1); expectGetActiveTasks(a1); - expectGetMaintenance(HOST_A); expectFiltering(); expectPreempted(a1); @@ -315,7 +316,7 @@ public class PreemptorImplTest extends EasyMockTest { // Ensures a production task can preempt 2 tasks on the same host. @Test public void testProductionPreemptingManyNonProduction() throws Exception { - schedulingFilter = new SchedulingFilterImpl(storageUtil.storage); + schedulingFilter = new SchedulingFilterImpl(); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); @@ -347,7 +348,7 @@ public class PreemptorImplTest extends EasyMockTest { // Ensures we select the minimal number of tasks to preempt @Test public void testMinimalSetPreempted() throws Exception { - schedulingFilter = new SchedulingFilterImpl(storageUtil.storage); + schedulingFilter = new SchedulingFilterImpl(); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096); @@ -382,7 +383,7 @@ public class PreemptorImplTest extends EasyMockTest { // Ensures a production task *never* preempts a production task from another job. @Test public void testProductionJobNeverPreemptsProductionJob() throws Exception { - schedulingFilter = new SchedulingFilterImpl(storageUtil.storage); + schedulingFilter = new SchedulingFilterImpl(); ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1"); p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); @@ -407,7 +408,7 @@ public class PreemptorImplTest extends EasyMockTest { // Ensures that we can preempt if a task + offer can satisfy a pending task. @Test public void testPreemptWithOfferAndTask() throws Exception { - schedulingFilter = new SchedulingFilterImpl(storageUtil.storage); + schedulingFilter = new SchedulingFilterImpl(); setUpHost(HOST_A, RACK_A); @@ -435,7 +436,7 @@ public class PreemptorImplTest extends EasyMockTest { // Ensures we can preempt if two tasks and an offer can satisfy a pending task. @Test public void testPreemptWithOfferAndMultipleTasks() throws Exception { - schedulingFilter = new SchedulingFilterImpl(storageUtil.storage); + schedulingFilter = new SchedulingFilterImpl(); setUpHost(HOST_A, RACK_A); @@ -468,7 +469,7 @@ public class PreemptorImplTest extends EasyMockTest { // Ensures we don't preempt if a host has enough slack to satisfy a pending task. @Test public void testPreemptWithLargeOffer() throws Exception { - schedulingFilter = new SchedulingFilterImpl(storageUtil.storage); + schedulingFilter = new SchedulingFilterImpl(); setUpHost(HOST_A, RACK_A); @@ -524,7 +525,7 @@ public class PreemptorImplTest extends EasyMockTest { schedulingFilter, PREEMPTION_DELAY, clock, - maintenance); + statsProvider); assertEquals( Optional.absent(), @@ -556,7 +557,9 @@ public class PreemptorImplTest extends EasyMockTest { .transform(new Function() { @Override public HostOffer apply(Offer offer) { - return new HostOffer(offer, MaintenanceMode.NONE); + return new HostOffer( + offer, + IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))); } }); expect(offerQueue.getOffers()).andReturn(hostOffers); @@ -569,8 +572,7 @@ public class PreemptorImplTest extends EasyMockTest { private IExpectationSetters> expectFiltering() { return expect(schedulingFilter.filter( EasyMock.anyObject(), - EasyMock.eq(HOST_A), - EasyMock.eq(MaintenanceMode.NONE), + EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(emptyJob))).andAnswer( @@ -659,6 +661,5 @@ public class PreemptorImplTest extends EasyMockTest { expect(this.storageUtil.attributeStore.getHostAttributes(host)) .andReturn(Optional.of(hostAttrs)).anyTimes(); - expect(this.maintenance.getMode(host)).andReturn(NONE).anyTimes(); } } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java index 0e699c9..0e8a98c 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java @@ -29,11 +29,13 @@ import com.twitter.common.util.Clock; import com.twitter.common.util.testing.FakeClock; import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.Identity; import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; @@ -48,6 +50,7 @@ import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.mem.MemStorage; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; @@ -58,7 +61,6 @@ import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; -import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; @@ -69,8 +71,9 @@ public class TaskSchedulerImplTest extends EasyMockTest { private static final IScheduledTask TASK_A = makeTask("a"); private static final IScheduledTask TASK_B = makeTask("b"); - private static final HostOffer OFFER = - new HostOffer(Offers.makeOffer("OFFER_A", "HOST_A"), MaintenanceMode.NONE); + private static final HostOffer OFFER = new HostOffer( + Offers.makeOffer("OFFER_A", "HOST_A"), + IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))); private StorageTestUtil storageUtil; private StateManager stateManager;