aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject incubator-aurora git commit: Making preemptor asynchronous. Part 2 - async handling.
Date Fri, 20 Mar 2015 00:02:22 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 6396410f7 -> 465086e59


Making preemptor asynchronous. Part 2 - async handling.

Bugs closed: AURORA-1158

Reviewed at https://reviews.apache.org/r/32220/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/465086e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/465086e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/465086e5

Branch: refs/heads/master
Commit: 465086e5926140591c5b319d80bf8c1af27463ee
Parents: 6396410
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Thu Mar 19 17:01:38 2015 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Thu Mar 19 17:01:38 2015 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/async/OfferManager.java    |  13 ++
 .../async/preemptor/PreemptionSlotCache.java    |  99 +++++++++++++++
 .../async/preemptor/PreemptionSlotFinder.java   |  42 ++++++-
 .../async/preemptor/PreemptorImpl.java          | 116 +++++++++++++++---
 .../async/preemptor/PreemptorMetrics.java       |  48 +++++---
 .../async/preemptor/PreemptorModule.java        |  15 +++
 .../preemptor/PreemptionSlotCacheTest.java      |  66 ++++++++++
 .../async/preemptor/PreemptorImplTest.java      | 120 ++++++++++++++++---
 .../preemptor/PreemptorSlotFinderTest.java      | 115 ++++++++----------
 9 files changed, 515 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
index 7d2cb46..e60d01e 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
@@ -107,6 +107,14 @@ public interface OfferManager extends EventSubscriber {
   Iterable<HostOffer> getOffers();
 
   /**
+   * Gets an offer for the given slave ID.
+   *
+   * @param slaveId Slave ID to get offer for.
+   * @return An offer for the slave ID.
+   */
+  Optional<HostOffer> getOffer(SlaveID slaveId);
+
+  /**
    * Calculates the amount of time before an offer should be 'returned' by declining it.
    * The delay is calculated for each offer that is received, so the return delay may be
    * fixed or variable.
@@ -207,6 +215,11 @@ public interface OfferManager extends EventSubscriber {
       return hostOffers.getWeaklyConsistentOffers();
     }
 
+    @Override
+    public Optional<HostOffer> getOffer(SlaveID slaveId) {
+      return hostOffers.get(slaveId);
+    }
+
     /**
      * Updates the preference of a host's offers.
      *

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java
new file mode 100644
index 0000000..4ca36e5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Caches preemption slots found for candidate tasks. Entries are purged from cache after
#duration.
+ */
+class PreemptionSlotCache {
+
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  @interface PreemptionSlotHoldDuration { }
+
+  @VisibleForTesting
+  static final String PREEMPTION_SLOT_CACHE_SIZE_STAT = "preemption_slot_cache_size";
+
+  private final Cache<String, PreemptionSlotFinder.PreemptionSlot> slots;
+
+  @Inject
+  PreemptionSlotCache(
+      StatsProvider statsProvider,
+      @PreemptionSlotHoldDuration Amount<Long, Time> duration,
+      final Clock clock) {
+
+    requireNonNull(duration);
+    requireNonNull(clock);
+    this.slots = CacheBuilder.newBuilder()
+        .expireAfterWrite(duration.as(Time.MINUTES), TimeUnit.MINUTES)
+        .ticker(new Ticker() {
+          @Override
+          public long read() {
+            return clock.nowNanos();
+          }
+        })
+        .build();
+
+    statsProvider.makeGauge(
+        PREEMPTION_SLOT_CACHE_SIZE_STAT,
+        new Supplier<Long>() {
+          @Override
+          public Long get() {
+            return slots.size();
+          }
+        });
+  }
+
+  void add(String taskId, PreemptionSlot preemptionSlot) {
+    requireNonNull(taskId);
+    requireNonNull(preemptionSlot);
+    slots.put(taskId, preemptionSlot);
+  }
+
+  Optional<PreemptionSlot> get(String taskId) {
+    return Optional.fromNullable(slots.getIfPresent(taskId));
+  }
+
+  void remove(String taskId) {
+    slots.invalidate(taskId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java
index e748b42..84bcdc5 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java
@@ -37,11 +37,13 @@ import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.mesos.ExecutorSettings;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.mesos.Protos.SlaveID;
 
 import static java.util.Objects.requireNonNull;
 
@@ -106,7 +108,7 @@ public interface PreemptionSlotFinder {
   }
 
   /**
-   * Searches for a {@link PreemptionSlot} for a given {@code taskId}.
+   * Searches for a {@link PreemptionSlot} for a given {@code pendingTask}.
    *
    * @param pendingTask Task to search preemption slot for.
    * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job.
@@ -118,6 +120,21 @@ public interface PreemptionSlotFinder {
       AttributeAggregate attributeAggregate,
       StoreProvider storeProvider);
 
+  /**
+   * Validates that a previously-found {@code preemptionSlot} can still accommodate a given
task.
+   *
+   * @param pendingTask Task to validate preemption slot for.
+   * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job.
+   * @param preemptionSlot A previously found preemption slot to validate.
+   * @param storeProvider A store provider to access task data.
+   * @return A finalized set of {@code PreemptionVictim} instances to preempt for a given
task.
+   */
+  Optional<ImmutableSet<PreemptionVictim>> validatePreemptionSlotFor(
+      IAssignedTask pendingTask,
+      AttributeAggregate attributeAggregate,
+      PreemptionSlot preemptionSlot,
+      StoreProvider storeProvider);
+
   class PreemptionSlotFinderImpl implements PreemptionSlotFinder {
     private final OfferManager offerManager;
     private final ClusterState clusterState;
@@ -198,8 +215,6 @@ public interface PreemptionSlotFinder {
         return Optional.absent();
       }
 
-      metrics.recordPreemptionAttemptFor(pendingTask.getTask());
-
       // Group the offers by slave id so they can be paired with active tasks from the same
slave.
       Multimap<String, HostOffer> slavesToOffers =
           Multimaps.index(offerManager.getOffers(), OFFER_TO_SLAVE_ID);
@@ -222,10 +237,27 @@ public interface PreemptionSlotFinder {
         }
       }
 
-      metrics.recordPreemptionFailure(pendingTask.getTask());
       return Optional.absent();
     }
 
+    @Override
+    public Optional<ImmutableSet<PreemptionVictim>> validatePreemptionSlotFor(
+        IAssignedTask pendingTask,
+        AttributeAggregate attributeAggregate,
+        PreemptionSlot preemptionSlot,
+        StoreProvider storeProvider) {
+
+      Optional<HostOffer> offer =
+          offerManager.getOffer(SlaveID.newBuilder().setValue(preemptionSlot.getSlaveId()).build());
+
+      return getTasksToPreempt(
+          preemptionSlot.getVictims(),
+          offer.asSet(),
+          pendingTask,
+          attributeAggregate,
+          storeProvider);
+    }
+
     /**
      * Optional.absent indicates that this slave does not have enough resources to satisfy
the task.
      * A set with elements indicates those tasks and the offers are enough.
@@ -273,7 +305,7 @@ public interface PreemptionSlotFinder {
             ResourceSlot.sum(Iterables.transform(toPreemptTasks, victimToResources)),
             slackResources);
 
-        Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
+        Set<Veto> vetoes = schedulingFilter.filter(
             new UnusedResource(totalResource, attributes.get()),
             new ResourceRequest(pendingTask.getTask(), pendingTask.getTaskId(), jobState));
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
index 1808b71..18a2e60 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.async.preemptor;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
+import java.util.concurrent.ScheduledExecutorService;
 
 import javax.inject.Inject;
 import javax.inject.Qualifier;
@@ -23,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
@@ -35,8 +37,10 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -60,6 +64,8 @@ public class PreemptorImpl implements Preemptor {
   private final PreemptionSlotFinder preemptionSlotFinder;
   private final PreemptorMetrics metrics;
   private final Amount<Long, Time> preemptionCandidacyDelay;
+  private final ScheduledExecutorService executor;
+  private final PreemptionSlotCache slotCache;
   private final Clock clock;
 
   /**
@@ -73,6 +79,11 @@ public class PreemptorImpl implements Preemptor {
   @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
   public @interface PreemptionDelay { }
 
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface PreemptionExecutor { }
+
   @Inject
   PreemptorImpl(
       Storage storage,
@@ -80,6 +91,8 @@ public class PreemptorImpl implements Preemptor {
       PreemptionSlotFinder preemptionSlotFinder,
       PreemptorMetrics metrics,
       @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
+      @PreemptionExecutor ScheduledExecutorService executor,
+      PreemptionSlotCache slotCache,
       Clock clock) {
 
     this.storage = requireNonNull(storage);
@@ -87,6 +100,8 @@ public class PreemptorImpl implements Preemptor {
     this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder);
     this.metrics = requireNonNull(metrics);
     this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
+    this.executor = requireNonNull(executor);
+    this.slotCache = requireNonNull(slotCache);
     this.clock = requireNonNull(clock);
   }
 
@@ -95,6 +110,26 @@ public class PreemptorImpl implements Preemptor {
       final String taskId,
       final AttributeAggregate attributeAggregate) {
 
+    final Optional<PreemptionSlot> preemptionSlot = slotCache.get(taskId);
+    if (preemptionSlot.isPresent()) {
+      // A preemption slot is available -> attempt to preempt tasks.
+      slotCache.remove(taskId);
+      return preemptTasks(taskId, preemptionSlot.get(), attributeAggregate);
+    } else {
+      // TODO(maxim): There is a potential race between preemption requests and async search.
+      // The side-effect of the race is benign as it only wastes CPU time and is unlikely
to happen
+      // often given our schedule penalty >> slot search time. However, we may want
to re-evaluate
+      // this when moving preemptor into background mode.
+      searchForPreemptionSlot(taskId, attributeAggregate);
+      return Optional.absent();
+    }
+  }
+
+  private Optional<String> preemptTasks(
+      final String taskId,
+      final PreemptionSlot preemptionSlot,
+      final AttributeAggregate attributeAggregate) {
+
     return storage.write(new Storage.MutateWork.Quiet<Optional<String>>() {
       @Override
       public Optional<String> apply(Storage.MutableStoreProvider storeProvider) {
@@ -102,29 +137,72 @@ public class PreemptorImpl implements Preemptor {
 
         // Task is no longer PENDING no need to preempt.
         if (!pendingTask.isPresent()) {
+            return Optional.absent();
+        }
+
+        // Validate a PreemptionSlot is still valid for the given task.
+        Optional<ImmutableSet<PreemptionVictim>> validatedVictims =
+            preemptionSlotFinder.validatePreemptionSlotFor(
+                pendingTask.get(),
+                attributeAggregate,
+                preemptionSlot,
+                storeProvider);
+
+        metrics.recordSlotValidationResult(validatedVictims);
+        if (!validatedVictims.isPresent()) {
+          // Previously found victims are no longer valid -> trigger a new search.
+          searchForPreemptionSlot(taskId, attributeAggregate);
           return Optional.absent();
         }
 
-        // TODO(maxim): Move preemption slot search into a read-only transaction and validate
-        //              weakly-consistent slot data before making a preemption.
-        Optional<PreemptionSlot> preemptionSlot = preemptionSlotFinder.findPreemptionSlotFor(
-            pendingTask.get(),
-            attributeAggregate,
-            storeProvider);
-
-        if (preemptionSlot.isPresent()) {
-          for (PreemptionVictim toPreempt : preemptionSlot.get().getVictims()) {
-            metrics.recordTaskPreemption(toPreempt);
-            stateManager.changeState(
-                storeProvider,
-                toPreempt.getTaskId(),
-                Optional.<ScheduleStatus>absent(),
-                PREEMPTING,
-                Optional.of("Preempting in favor of " + taskId));
-          }
-          return Optional.of(preemptionSlot.get().getSlaveId());
+        for (PreemptionVictim toPreempt : validatedVictims.get()) {
+          metrics.recordTaskPreemption(toPreempt);
+          stateManager.changeState(
+              storeProvider,
+              toPreempt.getTaskId(),
+              Optional.<ScheduleStatus>absent(),
+              PREEMPTING,
+              Optional.of("Preempting in favor of " + taskId));
+        }
+        return Optional.of(preemptionSlot.getSlaveId());
+      }
+    });
+  }
+
+  private void searchForPreemptionSlot(
+      final String taskId,
+      final AttributeAggregate attributeAggregate) {
+
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        Optional<PreemptionSlot> slot = storage.read(
+            new Storage.Work.Quiet<Optional<PreemptionSlot>>() {
+              @Override
+              public Optional<PreemptionSlot> apply(StoreProvider storeProvider) {
+                Optional<IAssignedTask> pendingTask = fetchIdlePendingTask(taskId,
storeProvider);
+
+                // Task is no longer PENDING no need to search for preemption slot.
+                if (!pendingTask.isPresent()) {
+                  return Optional.absent();
+                }
+
+                ITaskConfig task = pendingTask.get().getTask();
+                metrics.recordPreemptionAttemptFor(task);
+
+                Optional<PreemptionSlot> result = preemptionSlotFinder.findPreemptionSlotFor(
+                    pendingTask.get(),
+                    attributeAggregate,
+                    storeProvider);
+
+                metrics.recordSlotSearchResult(result, task);
+                return result;
+              }
+            });
+
+        if (slot.isPresent()) {
+          slotCache.add(taskId, slot.get());
         }
-        return Optional.absent();
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
index 801a6d7..782e751 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
@@ -18,6 +18,7 @@ import java.util.Set;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.scheduler.stats.CachedCounters;
@@ -42,10 +43,14 @@ public class PreemptorMetrics {
     assertFullyExported();
   }
 
-  private static String name(boolean production) {
+  private static String prod(boolean production) {
     return production ? "prod" : "non_prod";
   }
 
+  private static String result(boolean success) {
+    return success ? "successful" : "failed";
+  }
+
   private void assertFullyExported() {
     if (exported) {
       return;
@@ -57,8 +62,12 @@ public class PreemptorMetrics {
         attemptsStatName(true),
         successStatName(false),
         successStatName(true),
-        failureStatName(false),
-        failureStatName(true),
+        slotSearchStatName(true, false),
+        slotSearchStatName(false, false),
+        slotSearchStatName(true, true),
+        slotSearchStatName(false, true),
+        slotValidationStatName(true),
+        slotValidationStatName(false),
         MISSING_ATTRIBUTES_NAME);
     for (String stat : allStats) {
       counters.get(stat);
@@ -74,32 +83,41 @@ public class PreemptorMetrics {
 
   @VisibleForTesting
   static String attemptsStatName(boolean production) {
-    return "preemptor_attempts_for_" + name(production);
+    return "preemptor_slot_search_attempts_for_" + prod(production);
   }
 
-  void recordPreemptionAttemptFor(ITaskConfig task) {
-    increment(attemptsStatName(task.isProduction()));
+  @VisibleForTesting
+  static String successStatName(boolean production) {
+    return "preemptor_tasks_preempted_" + prod(production);
   }
 
   @VisibleForTesting
-  static String successStatName(boolean production) {
-    return "preemptor_tasks_preempted_" + name(production);
+  static String slotSearchStatName(boolean success, boolean production) {
+    return String.format("preemptor_slot_search_%s_for_%s", result(success), prod(production));
+  }
+
+  @VisibleForTesting
+  static String slotValidationStatName(boolean success) {
+    return "preemptor_slot_validation_" + result(success);
+  }
+
+  void recordPreemptionAttemptFor(ITaskConfig task) {
+    increment(attemptsStatName(task.isProduction()));
   }
 
   void recordTaskPreemption(PreemptionVictim victim) {
     increment(successStatName(victim.isProduction()));
   }
 
-  @VisibleForTesting
-  static String failureStatName(boolean production) {
-    return "preemptor_no_slots_found_for_" + name(production);
+  void recordSlotSearchResult(Optional<?> result, ITaskConfig task) {
+    increment(slotSearchStatName(result.isPresent(), task.isProduction()));
   }
 
-  void recordMissingAttributes() {
-    increment(MISSING_ATTRIBUTES_NAME);
+  void recordSlotValidationResult(Optional<?> result) {
+    increment(slotValidationStatName(result.isPresent()));
   }
 
-  void recordPreemptionFailure(ITaskConfig task) {
-    increment(failureStatName(task.isProduction()));
+  void recordMissingAttributes() {
+    increment(MISSING_ATTRIBUTES_NAME);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
index dbfebf9..f817ccd 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.async.preemptor;
 
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Logger;
 
 import javax.inject.Singleton;
@@ -31,6 +32,8 @@ import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.Preempti
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 
+import static org.apache.aurora.scheduler.base.AsyncUtil.singleThreadLoggingScheduledExecutor;
+
 public class PreemptorModule extends AbstractModule {
 
   private static final Logger LOG = Logger.getLogger(PreemptorModule.class.getName());
@@ -44,6 +47,11 @@ public class PreemptorModule extends AbstractModule {
   private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
       Arg.create(Amount.of(10L, Time.MINUTES));
 
+  @CmdLine(name = "preemption_slot_hold_time",
+      help = "Time to hold a preemption slot found before it is discarded.")
+  private static final Arg<Amount<Long, Time>> PREEMPTION_SLOT_HOLD_TIME =
+      Arg.create(Amount.of(3L, Time.MINUTES));
+
   private final boolean enablePreemptor;
 
   @VisibleForTesting
@@ -62,6 +70,9 @@ public class PreemptorModule extends AbstractModule {
       protected void configure() {
         if (enablePreemptor) {
           LOG.info("Preemptor Enabled.");
+          bind(ScheduledExecutorService.class)
+              .annotatedWith(PreemptorImpl.PreemptionExecutor.class)
+              .toInstance(singleThreadLoggingScheduledExecutor("PreemptorProcessor-%d", LOG));
           bind(PreemptorMetrics.class).in(Singleton.class);
           bind(PreemptionSlotFinder.class).to(PreemptionSlotFinderImpl.class);
           bind(PreemptionSlotFinderImpl.class).in(Singleton.class);
@@ -70,6 +81,10 @@ public class PreemptorModule extends AbstractModule {
           bind(new TypeLiteral<Amount<Long, Time>>() { })
               .annotatedWith(PreemptorImpl.PreemptionDelay.class)
               .toInstance(PREEMPTION_DELAY.get());
+          bind(new TypeLiteral<Amount<Long, Time>>() { })
+              .annotatedWith(PreemptionSlotCache.PreemptionSlotHoldDuration.class)
+              .toInstance(PREEMPTION_SLOT_HOLD_TIME.get());
+          bind(PreemptionSlotCache.class).in(Singleton.class);
           bind(ClusterState.class).to(ClusterStateImpl.class);
           bind(ClusterStateImpl.class).in(Singleton.class);
           expose(ClusterStateImpl.class);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java
new file mode 100644
index 0000000..80bd13a
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class PreemptionSlotCacheTest {
+  private static final Amount<Long, Time> HOLD_DURATION = Amount.of(1L, Time.MINUTES);
+  private static final String TASK_ID = "task_id";
+  private static final PreemptionSlot SLOT =
+      new PreemptionSlot(ImmutableSet.<PreemptionVictim>of(), "slave_id");
+
+  private FakeStatsProvider statsProvider;
+  private FakeClock clock;
+  private PreemptionSlotCache slotCache;
+
+  @Before
+  public void setUp() {
+    statsProvider = new FakeStatsProvider();
+    clock = new FakeClock();
+    slotCache = new PreemptionSlotCache(statsProvider, HOLD_DURATION, clock);
+  }
+
+  @Test
+  public void testExpiration() {
+    slotCache.add(TASK_ID, SLOT);
+    assertEquals(Optional.of(SLOT), slotCache.get(TASK_ID));
+    assertEquals(1L, statsProvider.getLongValue(
+        PreemptionSlotCache.PREEMPTION_SLOT_CACHE_SIZE_STAT));
+
+    clock.advance(HOLD_DURATION);
+
+    assertEquals(Optional.<PreemptionSlot>absent(), slotCache.get(TASK_ID));
+  }
+
+  @Test
+  public void testRemoval() {
+    slotCache.add(TASK_ID, SLOT);
+    assertEquals(Optional.of(SLOT), slotCache.get(TASK_ID));
+    slotCache.remove(TASK_ID);
+    assertEquals(Optional.<PreemptionSlot>absent(), slotCache.get(TASK_ID));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
index cfbc1a0..d17c4fb 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.async.preemptor;
 
 import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Suppliers;
@@ -23,7 +24,6 @@ import com.google.common.collect.ImmutableSet;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.JobKey;
@@ -41,12 +41,16 @@ import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.attemptsStatName;
+import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotSearchStatName;
+import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotValidationStatName;
 import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.successStatName;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
@@ -58,13 +62,17 @@ public class PreemptorImplTest extends EasyMockTest {
 
   private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS);
 
+  private static final Optional<PreemptionSlot> EMPTY_SLOT = Optional.absent();
+  private static final Optional<String> EMPTY_RESULT = Optional.absent();
+
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private FakeStatsProvider statsProvider;
   private PreemptionSlotFinder preemptionSlotFinder;
   private PreemptorImpl preemptor;
   private AttributeAggregate attrAggregate;
-  private FakeClock clock;
+  private PreemptionSlotCache slotCache;
+  private FakeScheduledExecutor clock;
 
   @Before
   public void setUp() {
@@ -72,8 +80,10 @@ public class PreemptorImplTest extends EasyMockTest {
     storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
     preemptionSlotFinder = createMock(PreemptionSlotFinder.class);
+    slotCache = createMock(PreemptionSlotCache.class);
     statsProvider = new FakeStatsProvider();
-    clock = new FakeClock();
+    ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
+    clock = FakeScheduledExecutor.scheduleExecutor(executor);
     attrAggregate = new AttributeAggregate(
         Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
         createMock(AttributeStore.class));
@@ -84,18 +94,69 @@ public class PreemptorImplTest extends EasyMockTest {
         preemptionSlotFinder,
         new PreemptorMetrics(new CachedCounters(statsProvider)),
         PREEMPTION_DELAY,
+        executor,
+        slotCache,
         clock);
   }
 
   @Test
-  public void testPreemption() throws Exception {
+  public void testSearchSlotSuccessful() throws Exception {
     ScheduledTask task = makeTask();
+    PreemptionSlot slot = createPreemptionSlot(task);
 
+    expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT);
     expectGetPendingTasks(task);
-    expect(preemptionSlotFinder.findPreemptionSlotFor(
-        IAssignedTask.build(task.getAssignedTask()),
-        attrAggregate,
-        storageUtil.mutableStoreProvider)).andReturn(Optional.of(createPreemptionSlot(task)));
+    expectSlotSearch(task, Optional.of(slot));
+    slotCache.add(TASK_ID, slot);
+
+    control.replay();
+
+    clock.advance(PREEMPTION_DELAY);
+
+    assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate));
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(true, true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true)));
+  }
+
+  @Test
+  public void testSearchSlotFailed() throws Exception {
+    ScheduledTask task = makeTask();
+
+    expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT);
+    expectGetPendingTasks(task);
+    expectSlotSearch(task, EMPTY_SLOT);
+
+    control.replay();
+
+    clock.advance(PREEMPTION_DELAY);
+
+    assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate));
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true)));
+    assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, true)));
+  }
+
+  @Test
+  public void testSearchSlotTaskNoLongerPending() throws Exception {
+    expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT);
+    storageUtil.expectTaskFetch(Query.statusScoped(PENDING).byId(TASK_ID));
+
+    control.replay();
+
+    assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate));
+  }
+
+  @Test
+  public void testPreemptTasksSuccessful() throws Exception {
+    ScheduledTask task = makeTask();
+    PreemptionSlot slot = createPreemptionSlot(task);
+
+    expect(slotCache.get(TASK_ID)).andReturn(Optional.of(slot));
+    slotCache.remove(TASK_ID);
+    expectGetPendingTasks(task);
+    expectSlotValidation(task, slot, Optional.of(ImmutableSet.of(
+        PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask())))));
 
     expectPreempted(task);
 
@@ -104,33 +165,60 @@ public class PreemptorImplTest extends EasyMockTest {
     clock.advance(PREEMPTION_DELAY);
 
     assertEquals(Optional.of(SLAVE_ID), preemptor.attemptPreemptionFor(TASK_ID, attrAggregate));
+    assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true)));
     assertEquals(1L, statsProvider.getLongValue(successStatName(true)));
   }
 
   @Test
-  public void testNoPreemption() throws Exception {
+  public void testPreemptTasksValidationFailed() throws Exception {
     ScheduledTask task = makeTask();
+    PreemptionSlot slot = createPreemptionSlot(task);
+
+    expect(slotCache.get(TASK_ID)).andReturn(Optional.of(slot));
+    slotCache.remove(TASK_ID);
     expectGetPendingTasks(task);
-    expect(preemptionSlotFinder.findPreemptionSlotFor(
-        IAssignedTask.build(task.getAssignedTask()),
-        attrAggregate,
-        storageUtil.mutableStoreProvider)).andReturn(Optional.<PreemptionSlot>absent());
+    storageUtil.expectTaskFetch(Query.statusScoped(PENDING).byId(TASK_ID));
+    expectSlotValidation(task, slot, Optional.<ImmutableSet<PreemptionVictim>>absent());
 
     control.replay();
 
     clock.advance(PREEMPTION_DELAY);
 
-    assertEquals(Optional.<String>absent(), preemptor.attemptPreemptionFor(TASK_ID,
attrAggregate));
+    assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate));
+    assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(false)));
     assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
   }
 
   @Test
-  public void testNoPendingTasks() {
+  public void testPreemptTaskNoLongerPending() throws Exception {
+    ScheduledTask task = makeTask();
+    PreemptionSlot slot = createPreemptionSlot(task);
+    expect(slotCache.get(TASK_ID)).andReturn(Optional.of(slot));
+    slotCache.remove(TASK_ID);
     storageUtil.expectTaskFetch(Query.statusScoped(PENDING).byId(TASK_ID));
 
     control.replay();
 
-    assertEquals(Optional.<String>absent(), preemptor.attemptPreemptionFor(TASK_ID,
attrAggregate));
+    assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, attrAggregate));
+  }
+
+  private void expectSlotSearch(ScheduledTask task, Optional<PreemptionSlot> slot)
{
+    expect(preemptionSlotFinder.findPreemptionSlotFor(
+        IAssignedTask.build(task.getAssignedTask()),
+        attrAggregate,
+        storageUtil.storeProvider)).andReturn(slot);
+  }
+
+  private void expectSlotValidation(
+      ScheduledTask task,
+      PreemptionSlot slot,
+      Optional<ImmutableSet<PreemptionVictim>> victims) {
+
+    expect(preemptionSlotFinder.validatePreemptionSlotFor(
+        IAssignedTask.build(task.getAssignedTask()),
+        attrAggregate,
+        slot,
+        storageUtil.mutableStoreProvider)).andReturn(victims);
   }
 
   private void expectPreempted(ScheduledTask preempted) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/465086e5/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java
index e329358..b80e558 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
@@ -59,6 +60,7 @@ import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.Protos;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.easymock.IExpectationSetters;
@@ -68,8 +70,6 @@ import org.junit.Test;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME;
-import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.attemptsStatName;
-import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.failureStatName;
 import static org.apache.mesos.Protos.Offer;
 import static org.apache.mesos.Protos.Resource;
 import static org.easymock.EasyMock.expect;
@@ -144,11 +144,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     control.replay();
     assertSlot(runSlotFinder(highPriority), lowPriority);
-
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   @Test
@@ -170,11 +165,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     control.replay();
     assertSlot(runSlotFinder(highPriority), lowerPriority);
-
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   @Test
@@ -199,11 +189,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     control.replay();
     assertSlot(runSlotFinder(pendingPriority), lowestPriority);
-
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   @Test
@@ -219,11 +204,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     control.replay();
     assertNoSlot(runSlotFinder(task));
-
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(1L, statsProvider.getLongValue(failureStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   @Test
@@ -242,11 +222,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     control.replay();
     assertSlot(runSlotFinder(p1), a1);
-
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   @Test
@@ -265,11 +240,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     control.replay();
     assertSlot(runSlotFinder(p1), a1);
-
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   @Test
@@ -284,11 +254,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     control.replay();
     assertNoSlot(runSlotFinder(p1));
-
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
-    assertEquals(1L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   // Ensures a production task can preempt 2 tasks on the same host.
@@ -315,11 +280,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     control.replay();
     assertSlot(runSlotFinder(p1), a1, b1);
-
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   // Ensures we select the minimal number of tasks to preempt
@@ -349,11 +309,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     control.replay();
     assertSlot(runSlotFinder(p1), a1);
-
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   // Ensures a production task *never* preempts a production task from another job.
@@ -376,11 +331,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     control.replay();
     assertNoSlot(runSlotFinder(p2));
-
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
-    assertEquals(1L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   // Ensures that we can preempt if a task + offer can satisfy a pending task.
@@ -404,11 +354,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     control.replay();
     assertSlot(runSlotFinder(p1), a1);
-
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
   }
 
   // Ensures we can preempt if two tasks and an offer can satisfy a pending task.
@@ -436,11 +381,50 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
 
     control.replay();
     assertSlot(runSlotFinder(p1), a1, a2);
+  }
+
+  @Test
+  public void testPreemptionSlotValidation() {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+
+    setUpHost();
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a1);
+
+    Offer o1 = makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1);
+    expectOffers(o1);
+    expect(offerManager.getOffer(Protos.SlaveID.newBuilder().setValue(SLAVE_ID).build()))
+        .andReturn(Optional.of(Iterables.getOnlyElement(makeOffers(o1))));
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    expectGetClusterState(a1);
 
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(failureStatName(true)));
+    control.replay();
+
+    Optional<PreemptionSlot> slot = runSlotFinder(p1);
+    assertSlot(slot, a1);
+
+    PreemptionSlotFinder slotFinder = new PreemptionSlotFinderImpl(
+        offerManager,
+        clusterState,
+        schedulingFilter,
+        TaskExecutors.NO_OVERHEAD_EXECUTOR,
+        preemptorMetrics);
+
+    Optional<ImmutableSet<PreemptionVictim>> victims = slotFinder.validatePreemptionSlotFor(
+        IAssignedTask.build(p1.getAssignedTask()),
+        emptyJob,
+        slot.get(),
+        storageUtil.mutableStoreProvider);
+
+    assertEquals(
+        Optional.of(ImmutableSet.of(PreemptionVictim.fromTask(
+            IAssignedTask.build(a1.getAssignedTask())))),
+        victims);
   }
 
   @Test
@@ -562,8 +546,8 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
     return builder.build();
   }
 
-  private void expectOffers(Offer... offers) {
-    Iterable<HostOffer> hostOffers = FluentIterable.from(Lists.newArrayList(offers))
+  private Iterable<HostOffer> makeOffers(Offer... offers) {
+    return FluentIterable.from(Lists.newArrayList(offers))
         .transform(new Function<Offer, HostOffer>() {
           @Override
           public HostOffer apply(Offer offer) {
@@ -572,7 +556,10 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
                 IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
           }
         });
-    expect(offerManager.getOffers()).andReturn(hostOffers);
+  }
+
+  private void expectOffers(Offer... offers) {
+    expect(offerManager.getOffers()).andReturn(makeOffers(offers));
   }
 
   private void expectNoOffers() {


Mime
View raw message