Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F1CFD17CB3 for ; Fri, 20 Mar 2015 00:03:19 +0000 (UTC) Received: (qmail 39393 invoked by uid 500); 20 Mar 2015 00:02:45 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 39361 invoked by uid 500); 20 Mar 2015 00:02:45 -0000 Mailing-List: contact commits-help@aurora.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.incubator.apache.org Delivered-To: mailing list commits@aurora.incubator.apache.org Received: (qmail 39352 invoked by uid 99); 20 Mar 2015 00:02:45 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Mar 2015 00:02:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 20 Mar 2015 00:02:42 +0000 Received: (qmail 37408 invoked by uid 99); 20 Mar 2015 00:02:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Mar 2015 00:02:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1AD04E192E; Fri, 20 Mar 2015 00:02:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: maxim@apache.org To: commits@aurora.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-aurora git commit: Making preemptor asynchronous. Part 2 - async handling. Date: Fri, 20 Mar 2015 00:02:22 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-aurora Updated Branches: refs/heads/master 6396410f7 -> 465086e59 Making preemptor asynchronous. Part 2 - async handling. Bugs closed: AURORA-1158 Reviewed at https://reviews.apache.org/r/32220/ Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/465086e5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/465086e5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/465086e5 Branch: refs/heads/master Commit: 465086e5926140591c5b319d80bf8c1af27463ee Parents: 6396410 Author: Maxim Khutornenko Authored: Thu Mar 19 17:01:38 2015 -0700 Committer: Maxim Khutornenko Committed: Thu Mar 19 17:01:38 2015 -0700 ---------------------------------------------------------------------- .../aurora/scheduler/async/OfferManager.java | 13 ++ .../async/preemptor/PreemptionSlotCache.java | 99 +++++++++++++++ .../async/preemptor/PreemptionSlotFinder.java | 42 ++++++- .../async/preemptor/PreemptorImpl.java | 116 +++++++++++++++--- .../async/preemptor/PreemptorMetrics.java | 48 +++++--- .../async/preemptor/PreemptorModule.java | 15 +++ .../preemptor/PreemptionSlotCacheTest.java | 66 ++++++++++ .../async/preemptor/PreemptorImplTest.java | 120 ++++++++++++++++--- .../preemptor/PreemptorSlotFinderTest.java | 115 ++++++++---------- 9 files changed, 515 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java index 7d2cb46..e60d01e 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java +++ b/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java @@ -107,6 +107,14 @@ public interface OfferManager extends EventSubscriber { Iterable getOffers(); /** + * Gets an offer for the given slave ID. + * + * @param slaveId Slave ID to get offer for. + * @return An offer for the slave ID. + */ + Optional getOffer(SlaveID slaveId); + + /** * Calculates the amount of time before an offer should be 'returned' by declining it. * The delay is calculated for each offer that is received, so the return delay may be * fixed or variable. @@ -207,6 +215,11 @@ public interface OfferManager extends EventSubscriber { return hostOffers.getWeaklyConsistentOffers(); } + @Override + public Optional getOffer(SlaveID slaveId) { + return hostOffers.get(slaveId); + } + /** * Updates the preference of a host's offers. * http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java new file mode 100644 index 0000000..4ca36e5 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java @@ -0,0 +1,99 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async.preemptor; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; +import javax.inject.Qualifier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Supplier; +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; +import com.twitter.common.util.Clock; + +import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static java.util.Objects.requireNonNull; + +/** + * Caches preemption slots found for candidate tasks. Entries are purged from cache after #duration. + */ +class PreemptionSlotCache { + + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface PreemptionSlotHoldDuration { } + + @VisibleForTesting + static final String PREEMPTION_SLOT_CACHE_SIZE_STAT = "preemption_slot_cache_size"; + + private final Cache slots; + + @Inject + PreemptionSlotCache( + StatsProvider statsProvider, + @PreemptionSlotHoldDuration Amount duration, + final Clock clock) { + + requireNonNull(duration); + requireNonNull(clock); + this.slots = CacheBuilder.newBuilder() + .expireAfterWrite(duration.as(Time.MINUTES), TimeUnit.MINUTES) + .ticker(new Ticker() { + @Override + public long read() { + return clock.nowNanos(); + } + }) + .build(); + + statsProvider.makeGauge( + PREEMPTION_SLOT_CACHE_SIZE_STAT, + new Supplier() { + @Override + public Long get() { + return slots.size(); + } + }); + } + + void add(String taskId, PreemptionSlot preemptionSlot) { + requireNonNull(taskId); + requireNonNull(preemptionSlot); + slots.put(taskId, preemptionSlot); + } + + Optional get(String taskId) { + return Optional.fromNullable(slots.getIfPresent(taskId)); + } + + void remove(String taskId) { + slots.invalidate(taskId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java index e748b42..84bcdc5 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java @@ -37,11 +37,13 @@ import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; +import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.mesos.ExecutorSettings; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.mesos.Protos.SlaveID; import static java.util.Objects.requireNonNull; @@ -106,7 +108,7 @@ public interface PreemptionSlotFinder { } /** - * Searches for a {@link PreemptionSlot} for a given {@code taskId}. + * Searches for a {@link PreemptionSlot} for a given {@code pendingTask}. * * @param pendingTask Task to search preemption slot for. * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job. @@ -118,6 +120,21 @@ public interface PreemptionSlotFinder { AttributeAggregate attributeAggregate, StoreProvider storeProvider); + /** + * Validates that a previously-found {@code preemptionSlot} can still accommodate a given task. + * + * @param pendingTask Task to validate preemption slot for. + * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job. + * @param preemptionSlot A previously found preemption slot to validate. + * @param storeProvider A store provider to access task data. + * @return A finalized set of {@code PreemptionVictim} instances to preempt for a given task. + */ + Optional> validatePreemptionSlotFor( + IAssignedTask pendingTask, + AttributeAggregate attributeAggregate, + PreemptionSlot preemptionSlot, + StoreProvider storeProvider); + class PreemptionSlotFinderImpl implements PreemptionSlotFinder { private final OfferManager offerManager; private final ClusterState clusterState; @@ -198,8 +215,6 @@ public interface PreemptionSlotFinder { return Optional.absent(); } - metrics.recordPreemptionAttemptFor(pendingTask.getTask()); - // Group the offers by slave id so they can be paired with active tasks from the same slave. Multimap slavesToOffers = Multimaps.index(offerManager.getOffers(), OFFER_TO_SLAVE_ID); @@ -222,10 +237,27 @@ public interface PreemptionSlotFinder { } } - metrics.recordPreemptionFailure(pendingTask.getTask()); return Optional.absent(); } + @Override + public Optional> validatePreemptionSlotFor( + IAssignedTask pendingTask, + AttributeAggregate attributeAggregate, + PreemptionSlot preemptionSlot, + StoreProvider storeProvider) { + + Optional offer = + offerManager.getOffer(SlaveID.newBuilder().setValue(preemptionSlot.getSlaveId()).build()); + + return getTasksToPreempt( + preemptionSlot.getVictims(), + offer.asSet(), + pendingTask, + attributeAggregate, + storeProvider); + } + /** * Optional.absent indicates that this slave does not have enough resources to satisfy the task. * A set with elements indicates those tasks and the offers are enough. @@ -273,7 +305,7 @@ public interface PreemptionSlotFinder { ResourceSlot.sum(Iterables.transform(toPreemptTasks, victimToResources)), slackResources); - Set vetoes = schedulingFilter.filter( + Set vetoes = schedulingFilter.filter( new UnusedResource(totalResource, attributes.get()), new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState)); http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java index 1808b71..18a2e60 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java @@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.async.preemptor; import java.lang.annotation.Retention; import java.lang.annotation.Target; +import java.util.concurrent.ScheduledExecutorService; import javax.inject.Inject; import javax.inject.Qualifier; @@ -23,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; @@ -35,8 +37,10 @@ import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import static java.lang.annotation.ElementType.FIELD; import static java.lang.annotation.ElementType.METHOD; @@ -60,6 +64,8 @@ public class PreemptorImpl implements Preemptor { private final PreemptionSlotFinder preemptionSlotFinder; private final PreemptorMetrics metrics; private final Amount preemptionCandidacyDelay; + private final ScheduledExecutorService executor; + private final PreemptionSlotCache slotCache; private final Clock clock; /** @@ -73,6 +79,11 @@ public class PreemptorImpl implements Preemptor { @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) public @interface PreemptionDelay { } + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface PreemptionExecutor { } + @Inject PreemptorImpl( Storage storage, @@ -80,6 +91,8 @@ public class PreemptorImpl implements Preemptor { PreemptionSlotFinder preemptionSlotFinder, PreemptorMetrics metrics, @PreemptionDelay Amount preemptionCandidacyDelay, + @PreemptionExecutor ScheduledExecutorService executor, + PreemptionSlotCache slotCache, Clock clock) { this.storage = requireNonNull(storage); @@ -87,6 +100,8 @@ public class PreemptorImpl implements Preemptor { this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder); this.metrics = requireNonNull(metrics); this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay); + this.executor = requireNonNull(executor); + this.slotCache = requireNonNull(slotCache); this.clock = requireNonNull(clock); } @@ -95,6 +110,26 @@ public class PreemptorImpl implements Preemptor { final String taskId, final AttributeAggregate attributeAggregate) { + final Optional preemptionSlot = slotCache.get(taskId); + if (preemptionSlot.isPresent()) { + // A preemption slot is available -> attempt to preempt tasks. + slotCache.remove(taskId); + return preemptTasks(taskId, preemptionSlot.get(), attributeAggregate); + } else { + // TODO(maxim): There is a potential race between preemption requests and async search. + // The side-effect of the race is benign as it only wastes CPU time and is unlikely to happen + // often given our schedule penalty >> slot search time. However, we may want to re-evaluate + // this when moving preemptor into background mode. + searchForPreemptionSlot(taskId, attributeAggregate); + return Optional.absent(); + } + } + + private Optional preemptTasks( + final String taskId, + final PreemptionSlot preemptionSlot, + final AttributeAggregate attributeAggregate) { + return storage.write(new Storage.MutateWork.Quiet>() { @Override public Optional apply(Storage.MutableStoreProvider storeProvider) { @@ -102,29 +137,72 @@ public class PreemptorImpl implements Preemptor { // Task is no longer PENDING no need to preempt. if (!pendingTask.isPresent()) { + return Optional.absent(); + } + + // Validate a PreemptionSlot is still valid for the given task. + Optional> validatedVictims = + preemptionSlotFinder.validatePreemptionSlotFor( + pendingTask.get(), + attributeAggregate, + preemptionSlot, + storeProvider); + + metrics.recordSlotValidationResult(validatedVictims); + if (!validatedVictims.isPresent()) { + // Previously found victims are no longer valid -> trigger a new search. + searchForPreemptionSlot(taskId, attributeAggregate); return Optional.absent(); } - // TODO(maxim): Move preemption slot search into a read-only transaction and validate - // weakly-consistent slot data before making a preemption. - Optional preemptionSlot = preemptionSlotFinder.findPreemptionSlotFor( - pendingTask.get(), - attributeAggregate, - storeProvider); - - if (preemptionSlot.isPresent()) { - for (PreemptionVictim toPreempt : preemptionSlot.get().getVictims()) { - metrics.recordTaskPreemption(toPreempt); - stateManager.changeState( - storeProvider, - toPreempt.getTaskId(), - Optional.absent(), - PREEMPTING, - Optional.of("Preempting in favor of " + taskId)); - } - return Optional.of(preemptionSlot.get().getSlaveId()); + for (PreemptionVictim toPreempt : validatedVictims.get()) { + metrics.recordTaskPreemption(toPreempt); + stateManager.changeState( + storeProvider, + toPreempt.getTaskId(), + Optional.absent(), + PREEMPTING, + Optional.of("Preempting in favor of " + taskId)); + } + return Optional.of(preemptionSlot.getSlaveId()); + } + }); + } + + private void searchForPreemptionSlot( + final String taskId, + final AttributeAggregate attributeAggregate) { + + executor.execute(new Runnable() { + @Override + public void run() { + Optional slot = storage.read( + new Storage.Work.Quiet>() { + @Override + public Optional apply(StoreProvider storeProvider) { + Optional pendingTask = fetchIdlePendingTask(taskId, storeProvider); + + // Task is no longer PENDING no need to search for preemption slot. + if (!pendingTask.isPresent()) { + return Optional.absent(); + } + + ITaskConfig task = pendingTask.get().getTask(); + metrics.recordPreemptionAttemptFor(task); + + Optional result = preemptionSlotFinder.findPreemptionSlotFor( + pendingTask.get(), + attributeAggregate, + storeProvider); + + metrics.recordSlotSearchResult(result, task); + return result; + } + }); + + if (slot.isPresent()) { + slotCache.add(taskId, slot.get()); } - return Optional.absent(); } }); } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java index 801a6d7..782e751 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java @@ -18,6 +18,7 @@ import java.util.Set; import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; import org.apache.aurora.scheduler.stats.CachedCounters; @@ -42,10 +43,14 @@ public class PreemptorMetrics { assertFullyExported(); } - private static String name(boolean production) { + private static String prod(boolean production) { return production ? "prod" : "non_prod"; } + private static String result(boolean success) { + return success ? "successful" : "failed"; + } + private void assertFullyExported() { if (exported) { return; @@ -57,8 +62,12 @@ public class PreemptorMetrics { attemptsStatName(true), successStatName(false), successStatName(true), - failureStatName(false), - failureStatName(true), + slotSearchStatName(true, false), + slotSearchStatName(false, false), + slotSearchStatName(true, true), + slotSearchStatName(false, true), + slotValidationStatName(true), + slotValidationStatName(false), MISSING_ATTRIBUTES_NAME); for (String stat : allStats) { counters.get(stat); @@ -74,32 +83,41 @@ public class PreemptorMetrics { @VisibleForTesting static String attemptsStatName(boolean production) { - return "preemptor_attempts_for_" + name(production); + return "preemptor_slot_search_attempts_for_" + prod(production); } - void recordPreemptionAttemptFor(ITaskConfig task) { - increment(attemptsStatName(task.isProduction())); + @VisibleForTesting + static String successStatName(boolean production) { + return "preemptor_tasks_preempted_" + prod(production); } @VisibleForTesting - static String successStatName(boolean production) { - return "preemptor_tasks_preempted_" + name(production); + static String slotSearchStatName(boolean success, boolean production) { + return String.format("preemptor_slot_search_%s_for_%s", result(success), prod(production)); + } + + @VisibleForTesting + static String slotValidationStatName(boolean success) { + return "preemptor_slot_validation_" + result(success); + } + + void recordPreemptionAttemptFor(ITaskConfig task) { + increment(attemptsStatName(task.isProduction())); } void recordTaskPreemption(PreemptionVictim victim) { increment(successStatName(victim.isProduction())); } - @VisibleForTesting - static String failureStatName(boolean production) { - return "preemptor_no_slots_found_for_" + name(production); + void recordSlotSearchResult(Optional result, ITaskConfig task) { + increment(slotSearchStatName(result.isPresent(), task.isProduction())); } - void recordMissingAttributes() { - increment(MISSING_ATTRIBUTES_NAME); + void recordSlotValidationResult(Optional result) { + increment(slotValidationStatName(result.isPresent())); } - void recordPreemptionFailure(ITaskConfig task) { - increment(failureStatName(task.isProduction())); + void recordMissingAttributes() { + increment(MISSING_ATTRIBUTES_NAME); } } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java index dbfebf9..f817ccd 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.async.preemptor; +import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Logger; import javax.inject.Singleton; @@ -31,6 +32,8 @@ import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.Preempti import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.filter.AttributeAggregate; +import static org.apache.aurora.scheduler.base.AsyncUtil.singleThreadLoggingScheduledExecutor; + public class PreemptorModule extends AbstractModule { private static final Logger LOG = Logger.getLogger(PreemptorModule.class.getName()); @@ -44,6 +47,11 @@ public class PreemptorModule extends AbstractModule { private static final Arg> PREEMPTION_DELAY = Arg.create(Amount.of(10L, Time.MINUTES)); + @CmdLine(name = "preemption_slot_hold_time", + help = "Time to hold a preemption slot found before it is discarded.") + private static final Arg> PREEMPTION_SLOT_HOLD_TIME = + Arg.create(Amount.of(3L, Time.MINUTES)); + private final boolean enablePreemptor; @VisibleForTesting @@ -62,6 +70,9 @@ public class PreemptorModule extends AbstractModule { protected void configure() { if (enablePreemptor) { LOG.info("Preemptor Enabled."); + bind(ScheduledExecutorService.class) + .annotatedWith(PreemptorImpl.PreemptionExecutor.class) + .toInstance(singleThreadLoggingScheduledExecutor("PreemptorProcessor-%d", LOG)); bind(PreemptorMetrics.class).in(Singleton.class); bind(PreemptionSlotFinder.class).to(PreemptionSlotFinderImpl.class); bind(PreemptionSlotFinderImpl.class).in(Singleton.class); @@ -70,6 +81,10 @@ public class PreemptorModule extends AbstractModule { bind(new TypeLiteral>() { }) .annotatedWith(PreemptorImpl.PreemptionDelay.class) .toInstance(PREEMPTION_DELAY.get()); + bind(new TypeLiteral>() { }) + .annotatedWith(PreemptionSlotCache.PreemptionSlotHoldDuration.class) + .toInstance(PREEMPTION_SLOT_HOLD_TIME.get()); + bind(PreemptionSlotCache.class).in(Singleton.class); bind(ClusterState.class).to(ClusterStateImpl.class); bind(ClusterStateImpl.class).in(Singleton.class); expose(ClusterStateImpl.class); http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java new file mode 100644 index 0000000..80bd13a --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java @@ -0,0 +1,66 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async.preemptor; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.testing.FakeClock; + +import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class PreemptionSlotCacheTest { + private static final Amount HOLD_DURATION = Amount.of(1L, Time.MINUTES); + private static final String TASK_ID = "task_id"; + private static final PreemptionSlot SLOT = + new PreemptionSlot(ImmutableSet.of(), "slave_id"); + + private FakeStatsProvider statsProvider; + private FakeClock clock; + private PreemptionSlotCache slotCache; + + @Before + public void setUp() { + statsProvider = new FakeStatsProvider(); + clock = new FakeClock(); + slotCache = new PreemptionSlotCache(statsProvider, HOLD_DURATION, clock); + } + + @Test + public void testExpiration() { + slotCache.add(TASK_ID, SLOT); + assertEquals(Optional.of(SLOT), slotCache.get(TASK_ID)); + assertEquals(1L, statsProvider.getLongValue( + PreemptionSlotCache.PREEMPTION_SLOT_CACHE_SIZE_STAT)); + + clock.advance(HOLD_DURATION); + + assertEquals(Optional.absent(), slotCache.get(TASK_ID)); + } + + @Test + public void testRemoval() { + slotCache.add(TASK_ID, SLOT); + assertEquals(Optional.of(SLOT), slotCache.get(TASK_ID)); + slotCache.remove(TASK_ID); + assertEquals(Optional.absent(), slotCache.get(TASK_ID)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java index cfbc1a0..d17c4fb 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.async.preemptor; import java.util.Arrays; +import java.util.concurrent.ScheduledExecutorService; import com.google.common.base.Optional; import com.google.common.base.Suppliers; @@ -23,7 +24,6 @@ import com.google.common.collect.ImmutableSet; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.testing.FakeClock; import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.JobKey; @@ -41,12 +41,16 @@ import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.attemptsStatName; +import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotSearchStatName; +import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotValidationStatName; import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.successStatName; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; @@ -58,13 +62,17 @@ public class PreemptorImplTest extends EasyMockTest { private static final Amount PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS); + private static final Optional EMPTY_SLOT = Optional.absent(); + private static final Optional EMPTY_RESULT = Optional.absent(); + private StorageTestUtil storageUtil; private StateManager stateManager; private FakeStatsProvider statsProvider; private PreemptionSlotFinder preemptionSlotFinder; private PreemptorImpl preemptor; private AttributeAggregate attrAggregate; - private FakeClock clock; + private PreemptionSlotCache slotCache; + private FakeScheduledExecutor clock; @Before public void setUp() { @@ -72,8 +80,10 @@ public class PreemptorImplTest extends EasyMockTest { storageUtil.expectOperations(); stateManager = createMock(StateManager.class); preemptionSlotFinder = createMock(PreemptionSlotFinder.class); + slotCache = createMock(PreemptionSlotCache.class); statsProvider = new FakeStatsProvider(); - clock = new FakeClock(); + ScheduledExecutorService executor = createMock(ScheduledExecutorService.class); + clock = FakeScheduledExecutor.scheduleExecutor(executor); attrAggregate = new AttributeAggregate( Suppliers.ofInstance(ImmutableSet.of()), createMock(AttributeStore.class)); @@ -84,18 +94,69 @@ public class PreemptorImplTest extends EasyMockTest { preemptionSlotFinder, new PreemptorMetrics(new CachedCounters(statsProvider)), PREEMPTION_DELAY, + executor, + slotCache, clock); } @Test - public void testPreemption() throws Exception { + public void testSearchSlotSuccessful() throws Exception { ScheduledTask task = makeTask(); + PreemptionSlot slot = createPreemptionSlot(task); + expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT); expectGetPendingTasks(task); - expect(preemptionSlotFinder.findPreemptionSlotFor( - IAssignedTask.build(task.getAssignedTask()), - attrAggregate, - storageUtil.mutableStoreProvider)).andReturn(Optional.of(createPreemptionSlot(task))); + expectSlotSearch(task, Optional.of(slot)); + slotCache.add(TASK_ID, slot); + + control.replay(); + + clock.advance(PREEMPTION_DELAY); + + assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); + } + + @Test + public void testSearchSlotFailed() throws Exception { + ScheduledTask task = makeTask(); + + expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT); + expectGetPendingTasks(task); + expectSlotSearch(task, EMPTY_SLOT); + + control.replay(); + + clock.advance(PREEMPTION_DELAY); + + assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, true))); + } + + @Test + public void testSearchSlotTaskNoLongerPending() throws Exception { + expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT); + storageUtil.expectTaskFetch(Query.statusScoped(PENDING).byId(TASK_ID)); + + control.replay(); + + assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); + } + + @Test + public void testPreemptTasksSuccessful() throws Exception { + ScheduledTask task = makeTask(); + PreemptionSlot slot = createPreemptionSlot(task); + + expect(slotCache.get(TASK_ID)).andReturn(Optional.of(slot)); + slotCache.remove(TASK_ID); + expectGetPendingTasks(task); + expectSlotValidation(task, slot, Optional.of(ImmutableSet.of( + PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask()))))); expectPreempted(task); @@ -104,33 +165,60 @@ public class PreemptorImplTest extends EasyMockTest { clock.advance(PREEMPTION_DELAY); assertEquals(Optional.of(SLAVE_ID), preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); + assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true))); assertEquals(1L, statsProvider.getLongValue(successStatName(true))); } @Test - public void testNoPreemption() throws Exception { + public void testPreemptTasksValidationFailed() throws Exception { ScheduledTask task = makeTask(); + PreemptionSlot slot = createPreemptionSlot(task); + + expect(slotCache.get(TASK_ID)).andReturn(Optional.of(slot)); + slotCache.remove(TASK_ID); expectGetPendingTasks(task); - expect(preemptionSlotFinder.findPreemptionSlotFor( - IAssignedTask.build(task.getAssignedTask()), - attrAggregate, - storageUtil.mutableStoreProvider)).andReturn(Optional.absent()); + storageUtil.expectTaskFetch(Query.statusScoped(PENDING).byId(TASK_ID)); + expectSlotValidation(task, slot, Optional.>absent()); control.replay(); clock.advance(PREEMPTION_DELAY); - assertEquals(Optional.absent(), preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); + assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); + assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(false))); assertEquals(0L, statsProvider.getLongValue(successStatName(true))); } @Test - public void testNoPendingTasks() { + public void testPreemptTaskNoLongerPending() throws Exception { + ScheduledTask task = makeTask(); + PreemptionSlot slot = createPreemptionSlot(task); + expect(slotCache.get(TASK_ID)).andReturn(Optional.of(slot)); + slotCache.remove(TASK_ID); storageUtil.expectTaskFetch(Query.statusScoped(PENDING).byId(TASK_ID)); control.replay(); - assertEquals(Optional.absent(), preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); + assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate)); + } + + private void expectSlotSearch(ScheduledTask task, Optional slot) { + expect(preemptionSlotFinder.findPreemptionSlotFor( + IAssignedTask.build(task.getAssignedTask()), + attrAggregate, + storageUtil.storeProvider)).andReturn(slot); + } + + private void expectSlotValidation( + ScheduledTask task, + PreemptionSlot slot, + Optional> victims) { + + expect(preemptionSlotFinder.validatePreemptionSlotFor( + IAssignedTask.build(task.getAssignedTask()), + attrAggregate, + slot, + storageUtil.mutableStoreProvider)).andReturn(victims); } private void expectPreempted(ScheduledTask preempted) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java index e329358..b80e558 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; @@ -59,6 +60,7 @@ import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.mesos.Protos; import org.easymock.EasyMock; import org.easymock.IAnswer; import org.easymock.IExpectationSetters; @@ -68,8 +70,6 @@ import org.junit.Test; import static org.apache.aurora.gen.MaintenanceMode.NONE; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME; -import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.attemptsStatName; -import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.failureStatName; import static org.apache.mesos.Protos.Offer; import static org.apache.mesos.Protos.Resource; import static org.easymock.EasyMock.expect; @@ -144,11 +144,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest { control.replay(); assertSlot(runSlotFinder(highPriority), lowPriority); - - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false))); - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } @Test @@ -170,11 +165,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest { control.replay(); assertSlot(runSlotFinder(highPriority), lowerPriority); - - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false))); - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } @Test @@ -199,11 +189,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest { control.replay(); assertSlot(runSlotFinder(pendingPriority), lowestPriority); - - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false))); - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } @Test @@ -219,11 +204,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest { control.replay(); assertNoSlot(runSlotFinder(task)); - - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false))); - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(1L, statsProvider.getLongValue(failureStatName(false))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } @Test @@ -242,11 +222,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest { control.replay(); assertSlot(runSlotFinder(p1), a1); - - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } @Test @@ -265,11 +240,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest { control.replay(); assertSlot(runSlotFinder(p1), a1); - - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } @Test @@ -284,11 +254,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest { control.replay(); assertNoSlot(runSlotFinder(p1)); - - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); - assertEquals(1L, statsProvider.getLongValue(failureStatName(true))); } // Ensures a production task can preempt 2 tasks on the same host. @@ -315,11 +280,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest { control.replay(); assertSlot(runSlotFinder(p1), a1, b1); - - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } // Ensures we select the minimal number of tasks to preempt @@ -349,11 +309,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest { control.replay(); assertSlot(runSlotFinder(p1), a1); - - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } // Ensures a production task *never* preempts a production task from another job. @@ -376,11 +331,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest { control.replay(); assertNoSlot(runSlotFinder(p2)); - - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); - assertEquals(1L, statsProvider.getLongValue(failureStatName(true))); } // Ensures that we can preempt if a task + offer can satisfy a pending task. @@ -404,11 +354,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest { control.replay(); assertSlot(runSlotFinder(p1), a1); - - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); } // Ensures we can preempt if two tasks and an offer can satisfy a pending task. @@ -436,11 +381,50 @@ public class PreemptorSlotFinderTest extends EasyMockTest { control.replay(); assertSlot(runSlotFinder(p1), a1, a2); + } + + @Test + public void testPreemptionSlotValidation() { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + + setUpHost(); + + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a1); + + Offer o1 = makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1); + expectOffers(o1); + expect(offerManager.getOffer(Protos.SlaveID.newBuilder().setValue(SLAVE_ID).build())) + .andReturn(Optional.of(Iterables.getOnlyElement(makeOffers(o1)))); + + ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + + expectGetClusterState(a1); - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(false))); - assertEquals(0L, statsProvider.getLongValue(failureStatName(true))); + control.replay(); + + Optional slot = runSlotFinder(p1); + assertSlot(slot, a1); + + PreemptionSlotFinder slotFinder = new PreemptionSlotFinderImpl( + offerManager, + clusterState, + schedulingFilter, + TaskExecutors.NO_OVERHEAD_EXECUTOR, + preemptorMetrics); + + Optional> victims = slotFinder.validatePreemptionSlotFor( + IAssignedTask.build(p1.getAssignedTask()), + emptyJob, + slot.get(), + storageUtil.mutableStoreProvider); + + assertEquals( + Optional.of(ImmutableSet.of(PreemptionVictim.fromTask( + IAssignedTask.build(a1.getAssignedTask())))), + victims); } @Test @@ -562,8 +546,8 @@ public class PreemptorSlotFinderTest extends EasyMockTest { return builder.build(); } - private void expectOffers(Offer... offers) { - Iterable hostOffers = FluentIterable.from(Lists.newArrayList(offers)) + private Iterable makeOffers(Offer... offers) { + return FluentIterable.from(Lists.newArrayList(offers)) .transform(new Function() { @Override public HostOffer apply(Offer offer) { @@ -572,7 +556,10 @@ public class PreemptorSlotFinderTest extends EasyMockTest { IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))); } }); - expect(offerManager.getOffers()).andReturn(hostOffers); + } + + private void expectOffers(Offer... offers) { + expect(offerManager.getOffers()).andReturn(makeOffers(offers)); } private void expectNoOffers() {