aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [2/2] aurora git commit: Improving async preemptor efficiency.
Date Wed, 22 Apr 2015 00:27:26 GMT
Improving async preemptor efficiency.

Bugs closed: AURORA-1219

Reviewed at https://reviews.apache.org/r/32597/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/8fd21a1a
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/8fd21a1a
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/8fd21a1a

Branch: refs/heads/master
Commit: 8fd21a1adf4df93166a64f1542120c1af6e77443
Parents: 8ba1b11
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Tue Apr 21 17:24:45 2015 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Tue Apr 21 17:24:45 2015 -0700

----------------------------------------------------------------------
 .../async/preemptor/PendingTaskProcessor.java   | 164 ++++-
 .../async/preemptor/PreemptionProposal.java     |  66 ++
 .../async/preemptor/PreemptionSlotFinder.java   | 351 ----------
 .../async/preemptor/PreemptionVictimFilter.java | 214 +++++++
 .../scheduler/async/preemptor/Preemptor.java    |  32 +-
 .../async/preemptor/PreemptorMetrics.java       |   6 +-
 .../async/preemptor/PreemptorModule.java        |  10 +-
 .../aurora/scheduler/base/TaskGroupKey.java     |   9 +
 .../aurora/scheduler/app/SchedulerIT.java       |   1 +
 .../preemptor/PendingTaskProcessorTest.java     | 231 +++++--
 .../preemptor/PreemptionVictimFilterTest.java   | 514 +++++++++++++++
 .../async/preemptor/PreemptorImplTest.java      |  56 +-
 .../preemptor/PreemptorSlotFinderTest.java      | 641 -------------------
 13 files changed, 1187 insertions(+), 1108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
index 00919b7..4427115 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
@@ -15,22 +15,40 @@ package org.apache.aurora.scheduler.async.preemptor;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
+
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.util.Clock;
 
-import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
@@ -56,10 +74,12 @@ import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
  */
 class PendingTaskProcessor implements Runnable {
   private final Storage storage;
-  private final PreemptionSlotFinder preemptionSlotFinder;
+  private final OfferManager offerManager;
+  private final PreemptionVictimFilter preemptionVictimFilter;
   private final PreemptorMetrics metrics;
   private final Amount<Long, Time> preemptionCandidacyDelay;
-  private final BiCache<PreemptionSlot, TaskGroupKey> slotCache;
+  private final BiCache<PreemptionProposal, TaskGroupKey> slotCache;
+  private final ClusterState clusterState;
   private final Clock clock;
 
   /**
@@ -76,17 +96,21 @@ class PendingTaskProcessor implements Runnable {
   @Inject
   PendingTaskProcessor(
       Storage storage,
-      PreemptionSlotFinder preemptionSlotFinder,
+      OfferManager offerManager,
+      PreemptionVictimFilter preemptionVictimFilter,
       PreemptorMetrics metrics,
       @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
-      BiCache<PreemptionSlot, TaskGroupKey> slotCache,
+      BiCache<PreemptionProposal, TaskGroupKey> slotCache,
+      ClusterState clusterState,
       Clock clock) {
 
     this.storage = requireNonNull(storage);
-    this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder);
+    this.offerManager = requireNonNull(offerManager);
+    this.preemptionVictimFilter = requireNonNull(preemptionVictimFilter);
     this.metrics = requireNonNull(metrics);
     this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
     this.slotCache = requireNonNull(slotCache);
+    this.clusterState = requireNonNull(clusterState);
     this.clock = requireNonNull(clock);
   }
 
@@ -95,42 +119,120 @@ class PendingTaskProcessor implements Runnable {
     metrics.recordTaskProcessorRun();
     storage.read(new Storage.Work.Quiet<Void>() {
       @Override
-      public Void apply(StoreProvider storeProvider) {
-        Multimap<IJobKey, IAssignedTask> pendingTasks = fetchIdlePendingTasks(storeProvider);
+      public Void apply(StoreProvider store) {
+        Multimap<String, PreemptionVictim> slavesToActiveTasks =
+            clusterState.getSlavesToActiveTasks();
+
+        if (slavesToActiveTasks.isEmpty()) {
+          // No preemption victims to consider.
+          return null;
+        }
 
-        for (IJobKey job : pendingTasks.keySet()) {
-          AttributeAggregate jobState = AttributeAggregate.getJobActiveState(storeProvider, job);
+        // Group the offers by slave id so they can be paired with active tasks from the same slave.
+        Map<String, HostOffer> slavesToOffers =
+            Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID);
 
-          for (IAssignedTask pendingTask : pendingTasks.get(job)) {
-            ITaskConfig task = pendingTask.getTask();
-            metrics.recordPreemptionAttemptFor(task);
+        Set<String> allSlaves = Sets.newHashSet(Iterables.concat(
+            slavesToOffers.keySet(),
+            slavesToActiveTasks.keySet()));
 
-            Optional<PreemptionSlot> slot = preemptionSlotFinder.findPreemptionSlotFor(
-                pendingTask,
-                jobState,
-                storeProvider);
+        // The algorithm below attempts to find a reservation for every task group by matching
+        // it against all available slaves until a preemption slot is found. Groups are evaluated
+        // in a round-robin fashion to ensure fairness (e.g.: G1, G2, G3, G1, G2).
+        // A slave is removed from further matching once a reservation is made. Similarly, all
+        // identical task group instances are removed from further iteration if none of the
+        // available slaves could yield a preemption proposal. A consuming iterator is used for
+        // task groups to ensure iteration order is preserved after a task group is removed.
+        LoadingCache<IJobKey, AttributeAggregate> jobStates = attributeCache(store);
+        List<TaskGroupKey> pendingGroups = fetchIdlePendingGroups(store);
+        Iterator<TaskGroupKey> groups = Iterators.consumingIterator(pendingGroups.iterator());
+        while (!pendingGroups.isEmpty()) {
+          boolean matched = false;
+          TaskGroupKey group = groups.next();
+          ITaskConfig task = group.getTask();
 
-            metrics.recordSlotSearchResult(slot, task);
+          metrics.recordPreemptionAttemptFor(task);
+          Iterator<String> slaveIterator = allSlaves.iterator();
+          while (slaveIterator.hasNext()) {
+            String slaveId = slaveIterator.next();
+            Optional<ImmutableSet<PreemptionVictim>> candidates =
+                preemptionVictimFilter.filterPreemptionVictims(
+                    task,
+                    slavesToActiveTasks.get(slaveId),
+                    jobStates.getUnchecked(task.getJob()),
+                    Optional.fromNullable(slavesToOffers.get(slaveId)),
+                    store);
 
-            if (slot.isPresent()) {
-              slotCache.put(slot.get(), TaskGroupKey.from(task));
+            metrics.recordSlotSearchResult(candidates, task);
+            if (candidates.isPresent()) {
+              // Slot found -> remove slave to avoid multiple task reservations.
+              slaveIterator.remove();
+              slotCache.put(new PreemptionProposal(candidates.get(), slaveId), group);
+              matched = true;
+              break;
             }
           }
+          if (!matched) {
+            // No slot found for the group -> remove group and reset group iterator.
+            pendingGroups.removeAll(ImmutableSet.of(group));
+            groups = Iterators.consumingIterator(pendingGroups.iterator());
+          }
         }
         return null;
       }
     });
   }
 
-  private Multimap<IJobKey, IAssignedTask> fetchIdlePendingTasks(StoreProvider store) {
-    return Multimaps.index(
-        FluentIterable
-            .from(store.getTaskStore().fetchTasks(Query.statusScoped(PENDING)))
+  private List<TaskGroupKey> fetchIdlePendingGroups(StoreProvider store) {
+    Multiset<TaskGroupKey> taskGroupCounts = HashMultiset.create(
+        FluentIterable.from(store.getTaskStore().fetchTasks(Query.statusScoped(PENDING)))
             .filter(Predicates.and(isIdleTask, Predicates.not(hasCachedSlot)))
-            .transform(SCHEDULED_TO_ASSIGNED),
-        Tasks.ASSIGNED_TO_JOB_KEY);
+            .transform(Functions.compose(ASSIGNED_TO_GROUP_KEY, SCHEDULED_TO_ASSIGNED)));
+
+    return getPreemptionSequence(taskGroupCounts);
+  }
+
+  /**
+   * Creates execution sequence for pending task groups by interleaving their unique occurrences.
+   * For example: {G1, G1, G1, G2, G2} will be converted into {G1, G2, G1, G2, G1}.
+   *
+   * @param groups Multiset of task groups.
+   * @return A task group execution sequence.
+   */
+  private static List<TaskGroupKey> getPreemptionSequence(Multiset<TaskGroupKey> groups) {
+    Multiset<TaskGroupKey> mutableGroups = HashMultiset.create(groups);
+    List<TaskGroupKey> instructions = Lists.newLinkedList();
+    Set<TaskGroupKey> keys = ImmutableSet.copyOf(groups.elementSet());
+    while (!mutableGroups.isEmpty()) {
+      for (TaskGroupKey key : keys) {
+        if (mutableGroups.contains(key)) {
+          instructions.add(key);
+          mutableGroups.remove(key);
+        }
+      }
+    }
+
+    return instructions;
+  }
+
+  private LoadingCache<IJobKey, AttributeAggregate> attributeCache(final StoreProvider store) {
+    return CacheBuilder.newBuilder().build(CacheLoader.from(
+        new Function<IJobKey, AttributeAggregate>() {
+          @Override
+          public AttributeAggregate apply(IJobKey job) {
+            return AttributeAggregate.getJobActiveState(store, job);
+          }
+        }));
   }
 
+  private static final Function<IAssignedTask, TaskGroupKey> ASSIGNED_TO_GROUP_KEY =
+      new Function<IAssignedTask, TaskGroupKey>() {
+        @Override
+        public TaskGroupKey apply(IAssignedTask task) {
+          return TaskGroupKey.from(task.getTask());
+        }
+      };
+
   private final Predicate<IScheduledTask> hasCachedSlot = new Predicate<IScheduledTask>() {
     @Override
     public boolean apply(IScheduledTask task) {
@@ -145,4 +247,12 @@ class PendingTaskProcessor implements Runnable {
           >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
     }
   };
+
+  private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
+      new Function<HostOffer, String>() {
+        @Override
+        public String apply(HostOffer offer) {
+          return offer.getOffer().getSlaveId().getValue();
+        }
+      };
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java
new file mode 100644
index 0000000..7a03168
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A set of tasks proposed for preemption on a given slave.
+ */
+class PreemptionProposal {
+  private final Set<PreemptionVictim> victims;
+  private final String slaveId;
+
+  PreemptionProposal(ImmutableSet<PreemptionVictim> victims, String slaveId) {
+    this.victims = requireNonNull(victims);
+    this.slaveId = requireNonNull(slaveId);
+  }
+
+  Set<PreemptionVictim> getVictims() {
+    return victims;
+  }
+
+  String getSlaveId() {
+    return slaveId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof PreemptionProposal)) {
+      return false;
+    }
+
+    PreemptionProposal other = (PreemptionProposal) o;
+    return Objects.equals(getVictims(), other.getVictims())
+        && Objects.equals(getSlaveId(), other.getSlaveId());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(victims, slaveId);
+  }
+
+  @Override
+  public String toString() {
+    return com.google.common.base.Objects.toStringHelper(this)
+        .add("victims", getVictims())
+        .add("slaveId", getSlaveId())
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java
deleted file mode 100644
index f16f964..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.util.Objects;
-import java.util.Set;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
-
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.async.OfferManager;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.mesos.ExecutorSettings;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.mesos.Protos.SlaveID;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Tries to find a slave with a combination of active tasks (victims) and available offer
- * (slack) resources that can accommodate a given task (candidate), provided victims are preempted.
- * <p>
- * A task may preempt another task if the following conditions hold true:
- * <ol>
- *  <li>The resources reserved for a victim (or a set of victims) are sufficient to satisfy
- *    the candidate.
- *  </li>
- *  <li>Both candidate and victim are owned by the same user and the
- *    {@link ITaskConfig#getPriority} of a victim is lower OR a victim is non-production and the
- *    candidate is production.
- *  </li>
- * </ol>
- */
-public interface PreemptionSlotFinder {
-
-  class PreemptionSlot {
-    private final Set<PreemptionVictim> victims;
-    private final String slaveId;
-
-    @VisibleForTesting
-    PreemptionSlot(ImmutableSet<PreemptionVictim> victims, String slaveId) {
-      this.victims = requireNonNull(victims);
-      this.slaveId = requireNonNull(slaveId);
-    }
-
-    Set<PreemptionVictim> getVictims() {
-      return victims;
-    }
-
-    String getSlaveId() {
-      return slaveId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof PreemptionSlot)) {
-        return false;
-      }
-
-      PreemptionSlot other = (PreemptionSlot) o;
-      return Objects.equals(getVictims(), other.getVictims())
-          && Objects.equals(getSlaveId(), other.getSlaveId());
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(victims, slaveId);
-    }
-
-    @Override
-    public String toString() {
-      return com.google.common.base.Objects.toStringHelper(this)
-          .add("victims", getVictims())
-          .add("slaveId", getSlaveId())
-          .toString();
-    }
-  }
-
-  /**
-   * Searches for a {@link PreemptionSlot} for a given {@code pendingTask}.
-   *
-   * @param pendingTask Task to search preemption slot for.
-   * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job.
-   * @param storeProvider A store provider to access task data.
-   * @return An instance of {@link PreemptionSlot} if preemption is possible.
-   */
-  Optional<PreemptionSlot> findPreemptionSlotFor(
-      IAssignedTask pendingTask,
-      AttributeAggregate attributeAggregate,
-      StoreProvider storeProvider);
-
-  /**
-   * Validates that a previously-found {@code preemptionSlot} can still accommodate a given task.
-   *
-   * @param pendingTask Task to validate preemption slot for.
-   * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job.
-   * @param preemptionSlot A previously found preemption slot to validate.
-   * @param storeProvider A store provider to access task data.
-   * @return A finalized set of {@code PreemptionVictim} instances to preempt for a given task.
-   */
-  Optional<ImmutableSet<PreemptionVictim>> validatePreemptionSlotFor(
-      IAssignedTask pendingTask,
-      AttributeAggregate attributeAggregate,
-      PreemptionSlot preemptionSlot,
-      StoreProvider storeProvider);
-
-  class PreemptionSlotFinderImpl implements PreemptionSlotFinder {
-    private final OfferManager offerManager;
-    private final ClusterState clusterState;
-    private final SchedulingFilter schedulingFilter;
-    private final ExecutorSettings executorSettings;
-    private final PreemptorMetrics metrics;
-
-    @Inject
-    PreemptionSlotFinderImpl(
-        OfferManager offerManager,
-        ClusterState clusterState,
-        SchedulingFilter schedulingFilter,
-        ExecutorSettings executorSettings,
-        PreemptorMetrics metrics) {
-
-      this.offerManager = requireNonNull(offerManager);
-      this.clusterState = requireNonNull(clusterState);
-      this.schedulingFilter = requireNonNull(schedulingFilter);
-      this.executorSettings = requireNonNull(executorSettings);
-      this.metrics = requireNonNull(metrics);
-    }
-
-    private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
-        new Function<HostOffer, ResourceSlot>() {
-          @Override
-          public ResourceSlot apply(HostOffer offer) {
-            return ResourceSlot.from(offer.getOffer());
-          }
-        };
-
-    private static final Function<HostOffer, String> OFFER_TO_HOST =
-        new Function<HostOffer, String>() {
-          @Override
-          public String apply(HostOffer offer) {
-            return offer.getOffer().getHostname();
-          }
-        };
-
-    private static final Function<PreemptionVictim, String> VICTIM_TO_HOST =
-        new Function<PreemptionVictim, String>() {
-          @Override
-          public String apply(PreemptionVictim victim) {
-            return victim.getSlaveHost();
-          }
-        };
-
-    private final Function<PreemptionVictim, ResourceSlot> victimToResources =
-        new Function<PreemptionVictim, ResourceSlot>() {
-          @Override
-          public ResourceSlot apply(PreemptionVictim victim) {
-            return ResourceSlot.from(victim, executorSettings);
-          }
-        };
-
-    // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
-    // ordering
-    private final Ordering<PreemptionVictim> resourceOrder =
-        ResourceSlot.ORDER.onResultOf(victimToResources).reverse();
-
-    private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
-        new Function<HostOffer, String>() {
-          @Override
-          public String apply(HostOffer offer) {
-            return offer.getOffer().getSlaveId().getValue();
-          }
-        };
-
-    // TODO(maxim): This should take pre-computed mappings (e.g. slaveToOffers) to avoid
-    // unnecessary repeated work.
-    @Override
-    public Optional<PreemptionSlot> findPreemptionSlotFor(
-        final IAssignedTask pendingTask,
-        AttributeAggregate attributeAggregate,
-        StoreProvider storeProvider) {
-
-      Multimap<String, PreemptionVictim> slavesToActiveTasks =
-          clusterState.getSlavesToActiveTasks();
-
-      if (slavesToActiveTasks.isEmpty()) {
-        return Optional.absent();
-      }
-
-      // Group the offers by slave id so they can be paired with active tasks from the same slave.
-      Multimap<String, HostOffer> slavesToOffers =
-          Multimaps.index(offerManager.getOffers(), OFFER_TO_SLAVE_ID);
-
-      Set<String> allSlaves = ImmutableSet.<String>builder()
-          .addAll(slavesToOffers.keySet())
-          .addAll(slavesToActiveTasks.keySet())
-          .build();
-
-      for (String slaveId : allSlaves) {
-        final Optional<ImmutableSet<PreemptionVictim>> preemptionVictims = getTasksToPreempt(
-            slavesToActiveTasks.get(slaveId),
-            slavesToOffers.get(slaveId),
-            pendingTask,
-            attributeAggregate,
-            storeProvider);
-
-        if (preemptionVictims.isPresent()) {
-          return Optional.of(new PreemptionSlot(preemptionVictims.get(), slaveId));
-        }
-      }
-
-      return Optional.absent();
-    }
-
-    @Override
-    public Optional<ImmutableSet<PreemptionVictim>> validatePreemptionSlotFor(
-        IAssignedTask pendingTask,
-        AttributeAggregate attributeAggregate,
-        PreemptionSlot preemptionSlot,
-        StoreProvider storeProvider) {
-
-      Optional<HostOffer> offer =
-          offerManager.getOffer(SlaveID.newBuilder().setValue(preemptionSlot.getSlaveId()).build());
-
-      return getTasksToPreempt(
-          preemptionSlot.getVictims(),
-          offer.asSet(),
-          pendingTask,
-          attributeAggregate,
-          storeProvider);
-    }
-
-    /**
-     * Optional.absent indicates that this slave does not have enough resources to satisfy the task.
-     * A set with elements indicates those tasks and the offers are enough.
-     */
-    private Optional<ImmutableSet<PreemptionVictim>> getTasksToPreempt(
-        Iterable<PreemptionVictim> possibleVictims,
-        Iterable<HostOffer> offers,
-        IAssignedTask pendingTask,
-        AttributeAggregate jobState,
-        StoreProvider storeProvider) {
-
-      // This enforces the precondition that all of the resources are from the same host. We need to
-      // get the host for the schedulingFilter.
-      Set<String> hosts = ImmutableSet.<String>builder()
-          .addAll(Iterables.transform(possibleVictims, VICTIM_TO_HOST))
-          .addAll(Iterables.transform(offers, OFFER_TO_HOST)).build();
-
-      ResourceSlot slackResources =
-          ResourceSlot.sum(Iterables.transform(offers, OFFER_TO_RESOURCE_SLOT));
-
-      FluentIterable<PreemptionVictim> preemptableTasks = FluentIterable.from(possibleVictims)
-          .filter(preemptionFilter(pendingTask.getTask()));
-
-      if (preemptableTasks.isEmpty()) {
-        return Optional.absent();
-      }
-
-      Set<PreemptionVictim> toPreemptTasks = Sets.newHashSet();
-
-      Iterable<PreemptionVictim> sortedVictims =
-          resourceOrder.immutableSortedCopy(preemptableTasks);
-
-      Optional<IHostAttributes> attributes =
-          storeProvider.getAttributeStore().getHostAttributes(Iterables.getOnlyElement(hosts));
-
-      if (!attributes.isPresent()) {
-        metrics.recordMissingAttributes();
-        return Optional.absent();
-      }
-
-      for (PreemptionVictim victim : sortedVictims) {
-        toPreemptTasks.add(victim);
-
-        ResourceSlot totalResource = ResourceSlot.sum(
-            ResourceSlot.sum(Iterables.transform(toPreemptTasks, victimToResources)),
-            slackResources);
-
-        Set<Veto> vetoes = schedulingFilter.filter(
-            new UnusedResource(totalResource, attributes.get()),
-            new ResourceRequest(pendingTask.getTask(), jobState));
-
-        if (vetoes.isEmpty()) {
-          return Optional.of(ImmutableSet.copyOf(toPreemptTasks));
-        }
-      }
-      return Optional.absent();
-    }
-
-    /**
-     * Creates a filter that will find tasks that the provided {@code pendingTask} may preempt.
-     *
-     * @param pendingTask A task that is not scheduled to possibly preempt other tasks for.
-     * @return A filter that will compare the priorities and resources required by other tasks
-     *     with {@code preemptableTask}.
-     */
-    private static Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig pendingTask) {
-      return new Predicate<PreemptionVictim>() {
-        @Override
-        public boolean apply(PreemptionVictim possibleVictim) {
-          boolean pendingIsProduction = pendingTask.isProduction();
-          boolean victimIsProduction = possibleVictim.isProduction();
-
-          if (pendingIsProduction && !victimIsProduction) {
-            return true;
-          } else if (pendingIsProduction == victimIsProduction) {
-            // If production flags are equal, preemption is based on priority within the same role.
-            if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) {
-              return pendingTask.getPriority() > possibleVictim.getPriority();
-            } else {
-              return false;
-            }
-          } else {
-            return false;
-          }
-        }
-      };
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java
new file mode 100644
index 0000000..75e2370
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.mesos.ExecutorSettings;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Filters active tasks (victims) and available offer (slack) resources that can accommodate a
+ * given task (candidate), provided victims are preempted.
+ * <p>
+ * A task may preempt another task if the following conditions hold true:
+ * <ol>
+ *  <li>The resources reserved for a victim (or a set of victims) are sufficient to satisfy
+ *    the candidate.
+ *  </li>
+ *  <li>Both candidate and victim are owned by the same user and the
+ *    {@link ITaskConfig#getPriority} of a victim is lower OR a victim is non-production and the
+ *    candidate is production.
+ *  </li>
+ * </ol>
+ */
+public interface PreemptionVictimFilter {
+  /**
+   * Returns a set of {@link PreemptionVictim} that can accommodate a given task if preempted.
+   *
+   * @param pendingTask Task to search preemption slot for.
+   * @param victims Active tasks on a slave.
+   * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job.
+   * @param offer A resource offer for a slave.
+   * @param storeProvider A store provider to access task data.
+   * @return A set of {@code PreemptionVictim} instances to preempt for a given task.
+   */
+  Optional<ImmutableSet<PreemptionVictim>> filterPreemptionVictims(
+      ITaskConfig pendingTask,
+      Iterable<PreemptionVictim> victims,
+      AttributeAggregate attributeAggregate,
+      Optional<HostOffer> offer,
+      StoreProvider storeProvider);
+
+  class PreemptionVictimFilterImpl implements PreemptionVictimFilter {
+    private final SchedulingFilter schedulingFilter;
+    private final ExecutorSettings executorSettings;
+    private final PreemptorMetrics metrics;
+
+    @Inject
+    PreemptionVictimFilterImpl(
+        SchedulingFilter schedulingFilter,
+        ExecutorSettings executorSettings,
+        PreemptorMetrics metrics) {
+
+      this.schedulingFilter = requireNonNull(schedulingFilter);
+      this.executorSettings = requireNonNull(executorSettings);
+      this.metrics = requireNonNull(metrics);
+    }
+
+    private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
+        new Function<HostOffer, ResourceSlot>() {
+          @Override
+          public ResourceSlot apply(HostOffer offer) {
+            return ResourceSlot.from(offer.getOffer());
+          }
+        };
+
+    private static final Function<HostOffer, String> OFFER_TO_HOST =
+        new Function<HostOffer, String>() {
+          @Override
+          public String apply(HostOffer offer) {
+            return offer.getOffer().getHostname();
+          }
+        };
+
+    private static final Function<PreemptionVictim, String> VICTIM_TO_HOST =
+        new Function<PreemptionVictim, String>() {
+          @Override
+          public String apply(PreemptionVictim victim) {
+            return victim.getSlaveHost();
+          }
+        };
+
+    private final Function<PreemptionVictim, ResourceSlot> victimToResources =
+        new Function<PreemptionVictim, ResourceSlot>() {
+          @Override
+          public ResourceSlot apply(PreemptionVictim victim) {
+            return ResourceSlot.from(victim, executorSettings);
+          }
+        };
+
+    // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
+    // ordering
+    private final Ordering<PreemptionVictim> resourceOrder =
+        ResourceSlot.ORDER.onResultOf(victimToResources).reverse();
+
+    @Override
+    public Optional<ImmutableSet<PreemptionVictim>> filterPreemptionVictims(
+        ITaskConfig pendingTask,
+        Iterable<PreemptionVictim> possibleVictims,
+        AttributeAggregate jobState,
+        Optional<HostOffer> offer,
+        StoreProvider storeProvider) {
+
+      // This enforces the precondition that all of the resources are from the same host. We need to
+      // get the host for the schedulingFilter.
+      Set<String> hosts = ImmutableSet.<String>builder()
+          .addAll(Iterables.transform(possibleVictims, VICTIM_TO_HOST))
+          .addAll(Iterables.transform(offer.asSet(), OFFER_TO_HOST)).build();
+
+      ResourceSlot slackResources =
+          ResourceSlot.sum(Iterables.transform(offer.asSet(), OFFER_TO_RESOURCE_SLOT));
+
+      FluentIterable<PreemptionVictim> preemptableTasks = FluentIterable.from(possibleVictims)
+          .filter(preemptionFilter(pendingTask));
+
+      if (preemptableTasks.isEmpty()) {
+        return Optional.absent();
+      }
+
+      Set<PreemptionVictim> toPreemptTasks = Sets.newHashSet();
+
+      Iterable<PreemptionVictim> sortedVictims =
+          resourceOrder.immutableSortedCopy(preemptableTasks);
+
+      Optional<IHostAttributes> attributes =
+          storeProvider.getAttributeStore().getHostAttributes(Iterables.getOnlyElement(hosts));
+
+      if (!attributes.isPresent()) {
+        metrics.recordMissingAttributes();
+        return Optional.absent();
+      }
+
+      for (PreemptionVictim victim : sortedVictims) {
+        toPreemptTasks.add(victim);
+
+        ResourceSlot totalResource = ResourceSlot.sum(
+            ResourceSlot.sum(Iterables.transform(toPreemptTasks, victimToResources)),
+            slackResources);
+
+        Set<Veto> vetoes = schedulingFilter.filter(
+            new UnusedResource(totalResource, attributes.get()),
+            new ResourceRequest(pendingTask, jobState));
+
+        if (vetoes.isEmpty()) {
+          return Optional.of(ImmutableSet.copyOf(toPreemptTasks));
+        }
+      }
+      return Optional.absent();
+    }
+
+    /**
+     * Creates a filter that will find tasks that the provided {@code pendingTask} may preempt.
+     *
+     * @param pendingTask A task that is not scheduled to possibly preempt other tasks for.
+     * @return A filter that will compare the priorities and resources required by other tasks
+     *     with {@code preemptableTask}.
+     */
+    private static Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig pendingTask) {
+      return new Predicate<PreemptionVictim>() {
+        @Override
+        public boolean apply(PreemptionVictim possibleVictim) {
+          boolean pendingIsProduction = pendingTask.isProduction();
+          boolean victimIsProduction = possibleVictim.isProduction();
+
+          if (pendingIsProduction && !victimIsProduction) {
+            return true;
+          } else if (pendingIsProduction == victimIsProduction) {
+            // If production flags are equal, preemption is based on priority within the same role.
+            if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) {
+              return pendingTask.getPriority() > possibleVictim.getPriority();
+            } else {
+              return false;
+            }
+          } else {
+            return false;
+          }
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
index 5200811..41591b8 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
@@ -21,12 +21,13 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
+import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.mesos.Protos.SlaveID;
 
 import static java.util.Objects.requireNonNull;
 
@@ -52,19 +53,22 @@ public interface Preemptor {
 
   class PreemptorImpl implements Preemptor {
     private final StateManager stateManager;
-    private final PreemptionSlotFinder preemptionSlotFinder;
+    private final OfferManager offerManager;
+    private final PreemptionVictimFilter preemptionVictimFilter;
     private final PreemptorMetrics metrics;
-    private final BiCache<PreemptionSlot, TaskGroupKey> slotCache;
+    private final BiCache<PreemptionProposal, TaskGroupKey> slotCache;
 
     @Inject
     PreemptorImpl(
         StateManager stateManager,
-        PreemptionSlotFinder preemptionSlotFinder,
+        OfferManager offerManager,
+        PreemptionVictimFilter preemptionVictimFilter,
         PreemptorMetrics metrics,
-        BiCache<PreemptionSlot, TaskGroupKey> slotCache) {
+        BiCache<PreemptionProposal, TaskGroupKey> slotCache) {
 
       this.stateManager = requireNonNull(stateManager);
-      this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder);
+      this.offerManager = requireNonNull(offerManager);
+      this.preemptionVictimFilter = requireNonNull(preemptionVictimFilter);
       this.metrics = requireNonNull(metrics);
       this.slotCache = requireNonNull(slotCache);
     }
@@ -76,17 +80,23 @@ public interface Preemptor {
         MutableStoreProvider store) {
 
       TaskGroupKey groupKey = TaskGroupKey.from(pendingTask.getTask());
-      Set<PreemptionSlot> preemptionSlots = slotCache.getByValue(groupKey);
+      Set<PreemptionProposal> preemptionProposals = slotCache.getByValue(groupKey);
 
       // A preemption slot is available -> attempt to preempt tasks.
-      if (!preemptionSlots.isEmpty()) {
+      if (!preemptionProposals.isEmpty()) {
         // Get the next available preemption slot.
-        PreemptionSlot slot = preemptionSlots.iterator().next();
+        PreemptionProposal slot = preemptionProposals.iterator().next();
         slotCache.remove(slot, groupKey);
 
-        // Validate a PreemptionSlot is still valid for the given task.
+        // Validate PreemptionProposal is still valid for the given task.
+        SlaveID slaveId = SlaveID.newBuilder().setValue(slot.getSlaveId()).build();
         Optional<ImmutableSet<PreemptionVictim>> validatedVictims =
-            preemptionSlotFinder.validatePreemptionSlotFor(pendingTask, jobState, slot, store);
+            preemptionVictimFilter.filterPreemptionVictims(
+                pendingTask.getTask(),
+                slot.getVictims(),
+                jobState,
+                offerManager.getOffer(slaveId),
+                store);
 
         metrics.recordSlotValidationResult(validatedVictims);
         if (!validatedVictims.isPresent()) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
index dc7eb44..22a1533 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
@@ -35,7 +35,7 @@ public class PreemptorMetrics {
   static final String MISSING_ATTRIBUTES_NAME = "preemptor_missing_attributes";
 
   @VisibleForTesting
-  static final String PENDING_PROCESSOR_RUN_NAME = "preemptor_task_processor_runs";
+  static final String TASK_PROCESSOR_RUN_NAME = "preemptor_task_processor_runs";
 
   private volatile boolean exported = false;
   private final CachedCounters counters;
@@ -72,7 +72,7 @@ public class PreemptorMetrics {
         slotValidationStatName(true),
         slotValidationStatName(false),
         MISSING_ATTRIBUTES_NAME,
-        PENDING_PROCESSOR_RUN_NAME);
+        TASK_PROCESSOR_RUN_NAME);
     for (String stat : allStats) {
       counters.get(stat);
     }
@@ -126,6 +126,6 @@ public class PreemptorMetrics {
   }
 
   void recordTaskProcessorRun() {
-    increment(PENDING_PROCESSOR_RUN_NAME);
+    increment(TASK_PROCESSOR_RUN_NAME);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
index 7cea881..156bac2 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
@@ -31,7 +31,6 @@ import com.twitter.common.quantity.Time;
 
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings;
-import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
@@ -90,9 +89,9 @@ public class PreemptorModule extends AbstractModule {
         if (enablePreemptor) {
           LOG.info("Preemptor Enabled.");
           bind(PreemptorMetrics.class).in(Singleton.class);
-          bind(PreemptionSlotFinder.class)
-              .to(PreemptionSlotFinder.PreemptionSlotFinderImpl.class);
-          bind(PreemptionSlotFinder.PreemptionSlotFinderImpl.class).in(Singleton.class);
+          bind(PreemptionVictimFilter.class)
+              .to(PreemptionVictimFilter.PreemptionVictimFilterImpl.class);
+          bind(PreemptionVictimFilter.PreemptionVictimFilterImpl.class).in(Singleton.class);
           bind(Preemptor.class).to(Preemptor.PreemptorImpl.class);
           bind(Preemptor.PreemptorImpl.class).in(Singleton.class);
           bind(new TypeLiteral<Amount<Long, Time>>() { })
@@ -100,7 +99,8 @@ public class PreemptorModule extends AbstractModule {
               .toInstance(preemptionDelay);
           bind(BiCacheSettings.class).toInstance(
               new BiCacheSettings(PREEMPTION_SLOT_HOLD_TIME.get(), "preemption_slot_cache_size"));
-          bind(new TypeLiteral<BiCache<PreemptionSlot, TaskGroupKey>>() { }).in(Singleton.class);
+          bind(new TypeLiteral<BiCache<PreemptionProposal, TaskGroupKey>>() { })
+              .in(Singleton.class);
           bind(PendingTaskProcessor.class).in(Singleton.class);
           bind(ClusterState.class).to(ClusterStateImpl.class);
           bind(ClusterStateImpl.class).in(Singleton.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java b/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java
index 6af3949..47e4d48 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java
@@ -40,6 +40,15 @@ public final class TaskGroupKey {
     return new TaskGroupKey(task);
   }
 
+  /**
+   * Gets {@link ITaskConfig} the key created from.
+   *
+   * @return A task config.
+   */
+  public ITaskConfig getTask() {
+    return canonicalTask;
+  }
+
   @Override
   public int hashCode() {
     return Objects.hash(canonicalTask);

http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 7bb1e7a..975920a 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -312,6 +312,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
         .setStatus(status)
         .setTaskEvents(ImmutableList.of(new TaskEvent(100, status)))
         .setAssignedTask(new AssignedTask()
+            .setSlaveId("slaveId")
             .setTaskId(id)
             .setTask(new TaskConfig()
                 .setJob(new JobKey("role-" + id, "test", "job-" + id))

http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
index 8a9a3b7..218ae0d 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
@@ -14,152 +14,283 @@
 package org.apache.aurora.scheduler.async.preemptor;
 
 import java.util.Arrays;
-import java.util.Set;
 
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.stats.CachedCounters;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.easymock.IExpectationSetters;
+import org.apache.mesos.Protos;
+import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.PENDING_PROCESSOR_RUN_NAME;
+import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.TASK_PROCESSOR_RUN_NAME;
 import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.attemptsStatName;
 import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotSearchStatName;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
 public class PendingTaskProcessorTest extends EasyMockTest {
-  private static final ScheduledTask TASK_A = makeTask("task_a");
-  private static final ScheduledTask TASK_B = makeTask("task_b");
-  private static final PreemptionSlot SLOT_A = createPreemptionSlot(TASK_A);
-  private static final PreemptionSlot SLOT_B = createPreemptionSlot(TASK_B);
-  private static final TaskGroupKey GROUP_A =
-      TaskGroupKey.from(ITaskConfig.build(TASK_A.getAssignedTask().getTask()));
-  private static final TaskGroupKey GROUP_B =
-      TaskGroupKey.from(ITaskConfig.build(TASK_B.getAssignedTask().getTask()));
-  private static final String SLAVE_ID = "slave_id";
-  private static final IJobKey JOB_KEY = IJobKey.build(TASK_A.getAssignedTask().getTask().getJob());
-
+  private static final String CACHE_STAT = "cache_size";
+  private static final String SLAVE_ID_1 = "slave_id_1";
+  private static final String SLAVE_ID_2 = "slave_id_2";
+  private static final JobKey JOB_A = new JobKey("role_a", "env", "job_a");
+  private static final JobKey JOB_B = new JobKey("role_b", "env", "job_b");
+  private static final ScheduledTask TASK_A = makeTask(JOB_A, SLAVE_ID_1, "id1");
+  private static final ScheduledTask TASK_B = makeTask(JOB_B, SLAVE_ID_2, "id2");
+  private static final PreemptionProposal SLOT_A = createPreemptionProposal(TASK_A, SLAVE_ID_1);
   private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS);
-
-  private static final Set<PreemptionSlot> NO_SLOTS = ImmutableSet.of();
+  private static final Amount<Long, Time> EXPIRATION = Amount.of(10L, Time.MINUTES);
 
   private StorageTestUtil storageUtil;
+  private OfferManager offerManager;
   private FakeStatsProvider statsProvider;
-  private PreemptionSlotFinder preemptionSlotFinder;
+  private PreemptionVictimFilter preemptionVictimFilter;
   private PendingTaskProcessor slotFinder;
-  private BiCache<PreemptionSlot, TaskGroupKey> slotCache;
+  private BiCache<PreemptionProposal, TaskGroupKey> slotCache;
+  private ClusterState clusterState;
   private FakeClock clock;
 
   @Before
   public void setUp() {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
-    preemptionSlotFinder = createMock(PreemptionSlotFinder.class);
-    slotCache = createMock(new Clazz<BiCache<PreemptionSlot, TaskGroupKey>>() { });
+    offerManager = createMock(OfferManager.class);
+    preemptionVictimFilter = createMock(PreemptionVictimFilter.class);
     statsProvider = new FakeStatsProvider();
+    clusterState = createMock(ClusterState.class);
     clock = new FakeClock();
+    slotCache = new BiCache<>(
+        statsProvider,
+        new BiCache.BiCacheSettings(EXPIRATION, CACHE_STAT),
+        clock);
 
     slotFinder = new PendingTaskProcessor(
         storageUtil.storage,
-        preemptionSlotFinder,
+        offerManager,
+        preemptionVictimFilter,
         new PreemptorMetrics(new CachedCounters(statsProvider)),
         PREEMPTION_DELAY,
         slotCache,
+        clusterState,
         clock);
   }
   @Test
   public void testSearchSlotSuccessful() throws Exception {
-    expect(slotCache.getByValue(GROUP_A)).andReturn(NO_SLOTS);
-    expect(slotCache.getByValue(GROUP_B)).andReturn(NO_SLOTS);
     expectGetPendingTasks(TASK_A, TASK_B);
-    expectAttributeAggegateFetchTasks();
-    expectSlotSearch(TASK_A, Optional.of(SLOT_A));
-    expectSlotSearch(TASK_B, Optional.of(SLOT_B));
-    slotCache.put(SLOT_A, GROUP_A);
-    slotCache.put(SLOT_B, GROUP_B);
+    expectGetClusterState(TASK_A, TASK_B);
+    HostOffer offer1 = makeOffer(SLAVE_ID_1);
+    HostOffer offer2 = makeOffer(SLAVE_ID_2);
+    expectOffers(offer1, offer2);
+    expectSlotSearch(TASK_A, offer1, TASK_A);
+    expectSlotSearch(TASK_B, offer2, TASK_B);
 
     control.replay();
 
     clock.advance(PREEMPTION_DELAY);
 
     slotFinder.run();
-    assertEquals(1L, statsProvider.getLongValue(PENDING_PROCESSOR_RUN_NAME));
+    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
     assertEquals(2L, statsProvider.getLongValue(attemptsStatName(true)));
     assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true)));
     assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true)));
+    assertEquals(2L, statsProvider.getLongValue(CACHE_STAT));
   }
 
   @Test
   public void testSearchSlotFailed() throws Exception {
-    expect(slotCache.getByValue(GROUP_A)).andReturn(NO_SLOTS);
     expectGetPendingTasks(TASK_A);
-    expectAttributeAggegateFetchTasks();
-    expectSlotSearch(TASK_A, Optional.<PreemptionSlot>absent());
+    expectGetClusterState(TASK_A);
+    HostOffer offer1 = makeOffer(SLAVE_ID_1);
+    expectOffers(offer1);
+    expectSlotSearch(TASK_A, offer1);
 
     control.replay();
 
     clock.advance(PREEMPTION_DELAY);
 
     slotFinder.run();
-    assertEquals(1L, statsProvider.getLongValue(PENDING_PROCESSOR_RUN_NAME));
+    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
     assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
     assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true)));
     assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, true)));
   }
 
-  private void expectSlotSearch(ScheduledTask task, Optional<PreemptionSlot> slot) {
-    expect(preemptionSlotFinder.findPreemptionSlotFor(
-        IAssignedTask.build(task.getAssignedTask()),
-        AttributeAggregate.EMPTY,
-        storageUtil.storeProvider)).andReturn(slot);
+  @Test
+  public void testHasCachedSlots() throws Exception {
+    slotCache.put(SLOT_A, group(TASK_A));
+    expectGetPendingTasks(TASK_A);
+    expectGetClusterState(TASK_A);
+    HostOffer offer1 = makeOffer(SLAVE_ID_1);
+    expectOffers(offer1);
+
+    control.replay();
+
+    clock.advance(PREEMPTION_DELAY);
+
+    slotFinder.run();
+    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true)));
+  }
+
+  @Test
+  public void testMultipleTaskGroups() throws Exception {
+    ScheduledTask task1 = makeTask(JOB_A, "1");
+    ScheduledTask task2 = makeTask(JOB_A, "2");
+    ScheduledTask task3 = makeTask(JOB_A, "3");
+    ScheduledTask task4 = makeTask(JOB_B, "4");
+    ScheduledTask task5 = makeTask(JOB_B, "5");
+
+    expectGetPendingTasks(task1, task4, task2, task5, task3);
+    expectGetClusterState(TASK_A, TASK_B);
+
+    HostOffer offer1 = makeOffer(SLAVE_ID_1);
+    HostOffer offer2 = makeOffer(SLAVE_ID_2);
+    expectOffers(offer1, offer2);
+    expectSlotSearch(task1, offer1);
+    expectSlotSearch(task4, offer1, TASK_B);
+    expectSlotSearch(task5, offer2, TASK_B);
+    PreemptionProposal proposal1 = createPreemptionProposal(TASK_B, SLAVE_ID_1);
+    PreemptionProposal proposal2 = createPreemptionProposal(TASK_B, SLAVE_ID_2);
+
+    control.replay();
+
+    clock.advance(PREEMPTION_DELAY);
+
+    slotFinder.run();
+    assertEquals(slotCache.get(proposal1), Optional.of(group(task4)));
+    assertEquals(slotCache.get(proposal2), Optional.of(group(task5)));
+    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
+    assertEquals(3L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true)));
+    assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, true)));
+    assertEquals(2L, statsProvider.getLongValue(CACHE_STAT));
   }
 
-  private static PreemptionSlot createPreemptionSlot(ScheduledTask task) {
+  @Test
+  public void testNoVictims() throws Exception {
+    expectGetClusterState();
+    control.replay();
+
+    clock.advance(PREEMPTION_DELAY);
+
+    slotFinder.run();
+    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true)));
+  }
+
+  private static final Function<ScheduledTask, String> GET_SLAVE_ID =
+      new Function<ScheduledTask, String>() {
+        @Override
+        public String apply(ScheduledTask task) {
+          return task.getAssignedTask().getSlaveId();
+        }
+      };
+
+  private Multimap<String, PreemptionVictim> getVictims(ScheduledTask... tasks) {
+    return Multimaps.transformValues(
+        Multimaps.index(Arrays.asList(tasks), GET_SLAVE_ID),
+        new Function<ScheduledTask, PreemptionVictim>() {
+          @Override
+          public PreemptionVictim apply(ScheduledTask task) {
+            return PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask()));
+          }
+        }
+    );
+  }
+
+  private HostOffer makeOffer(String slaveId) {
+    Protos.Offer.Builder builder = Protos.Offer.newBuilder();
+    builder.getIdBuilder().setValue("id");
+    builder.getFrameworkIdBuilder().setValue("framework-id");
+    builder.getSlaveIdBuilder().setValue(slaveId);
+    builder.setHostname(slaveId);
+    return new HostOffer(
+        builder.build(),
+        IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
+  }
+
+  private void expectOffers(HostOffer... offers) {
+    expect(offerManager.getOffers()).andReturn(ImmutableSet.copyOf(offers));
+  }
+
+  private void expectGetClusterState(ScheduledTask... returnedTasks) {
+    expect(clusterState.getSlavesToActiveTasks()).andReturn(getVictims(returnedTasks));
+  }
+
+  private void expectSlotSearch(ScheduledTask task, HostOffer offer, ScheduledTask... victims) {
+    expect(preemptionVictimFilter.filterPreemptionVictims(
+        eq(ITaskConfig.build(task.getAssignedTask().getTask())),
+        EasyMock.<Iterable<PreemptionVictim>>anyObject(),
+        anyObject(AttributeAggregate.class),
+        eq(Optional.of(offer)),
+        eq(storageUtil.storeProvider))).andReturn(
+        victims.length == 0
+            ? Optional.<ImmutableSet<PreemptionVictim>>absent()
+            : Optional.of(ImmutableSet.copyOf(getVictims(victims).values())));
+  }
+
+  private static PreemptionProposal createPreemptionProposal(ScheduledTask task, String slaveId) {
     IAssignedTask assigned = IAssignedTask.build(task.getAssignedTask());
-    return new PreemptionSlot(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID);
+    return new PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), slaveId);
   }
 
-  private static ScheduledTask makeTask(String taskId) {
+  private static ScheduledTask makeTask(JobKey key, String taskId) {
+    return makeTask(key, null, taskId);
+  }
+
+  private static TaskGroupKey group(ScheduledTask task) {
+    return TaskGroupKey.from(ITaskConfig.build(task.getAssignedTask().getTask()));
+  }
+
+  private static ScheduledTask makeTask(JobKey key, @Nullable String slaveId, String taskId) {
     ScheduledTask task = new ScheduledTask()
         .setAssignedTask(new AssignedTask()
+            .setSlaveId(slaveId)
             .setTaskId(taskId)
             .setTask(new TaskConfig()
                 .setPriority(1)
                 .setProduction(true)
-                .setJob(new JobKey("role", "env", "name"))));
+                .setJob(key)));
     task.addToTaskEvents(new TaskEvent(0, PENDING));
     return task;
   }
 
-  private IExpectationSetters<?> expectAttributeAggegateFetchTasks() {
-    return storageUtil.expectTaskFetch(
-        Query.jobScoped(JOB_KEY).byStatus(Tasks.SLAVE_ASSIGNED_STATES));
-  }
-
   private void expectGetPendingTasks(ScheduledTask... returnedTasks) {
     storageUtil.expectTaskFetch(
         Query.statusScoped(PENDING),

http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java
new file mode 100644
index 0000000..67dfb82
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.Constraint;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
+import org.apache.aurora.scheduler.mesos.TaskExecutors;
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.mesos.Protos.Offer;
+import static org.apache.mesos.Protos.Resource;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class PreemptionVictimFilterTest extends EasyMockTest {
+  private static final String USER_A = "user_a";
+  private static final String USER_B = "user_b";
+  private static final String USER_C = "user_c";
+  private static final String JOB_A = "job_a";
+  private static final String JOB_B = "job_b";
+  private static final String JOB_C = "job_c";
+  private static final String TASK_ID_A = "task_a";
+  private static final String TASK_ID_B = "task_b";
+  private static final String TASK_ID_C = "task_c";
+  private static final String TASK_ID_D = "task_d";
+  private static final String HOST = "host";
+  private static final String RACK = "rack";
+  private static final String SLAVE_ID = HOST + "_id";
+  private static final String RACK_ATTRIBUTE = "rack";
+  private static final String HOST_ATTRIBUTE = "host";
+  private static final String OFFER = "offer";
+  private static final Optional<HostOffer> NO_OFFER = Optional.absent();
+
+  private StorageTestUtil storageUtil;
+  private SchedulingFilter schedulingFilter;
+  private FakeStatsProvider statsProvider;
+  private PreemptorMetrics preemptorMetrics;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    statsProvider = new FakeStatsProvider();
+    preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider));
+  }
+
+  private Optional<ImmutableSet<PreemptionVictim>> runFilter(
+      ScheduledTask pendingTask,
+      Optional<HostOffer> offer,
+      ScheduledTask... victims) {
+
+    PreemptionVictimFilter.PreemptionVictimFilterImpl filter =
+        new PreemptionVictimFilter.PreemptionVictimFilterImpl(
+            schedulingFilter,
+            TaskExecutors.NO_OVERHEAD_EXECUTOR,
+            preemptorMetrics);
+
+    return filter.filterPreemptionVictims(
+        ITaskConfig.build(pendingTask.getAssignedTask().getTask()),
+        preemptionVictims(victims),
+        EMPTY,
+        offer,
+        storageUtil.mutableStoreProvider);
+  }
+
+  @Test
+  public void testPreempted() throws Exception {
+    setUpHost();
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
+    assignToHost(lowPriority);
+
+    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
+
+    expectFiltering();
+
+    control.replay();
+    assertVictims(runFilter(highPriority, NO_OFFER, lowPriority), lowPriority);
+  }
+
+  @Test
+  public void testLowestPriorityPreempted() throws Exception {
+    setUpHost();
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10);
+    assignToHost(lowPriority);
+
+    ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 1);
+    assignToHost(lowerPriority);
+
+    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 100);
+
+    expectFiltering();
+
+    control.replay();
+    assertVictims(runFilter(highPriority, NO_OFFER, lowerPriority), lowerPriority);
+  }
+
+  @Test
+  public void testOnePreemptableTask() throws Exception {
+    setUpHost();
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
+    assignToHost(highPriority);
+
+    ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 99);
+    assignToHost(lowerPriority);
+
+    ScheduledTask lowestPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 1);
+    assignToHost(lowestPriority);
+
+    ScheduledTask pendingPriority = makeTask(USER_A, JOB_A, TASK_ID_D, 98);
+
+    expectFiltering();
+
+    control.replay();
+    assertVictims(
+        runFilter(pendingPriority, NO_OFFER, highPriority, lowerPriority, lowestPriority),
+        lowestPriority);
+  }
+
+  @Test
+  public void testHigherPriorityRunning() throws Exception {
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
+    assignToHost(highPriority);
+
+    ScheduledTask task = makeTask(USER_A, JOB_A, TASK_ID_A);
+
+    control.replay();
+    assertNoVictims(runFilter(task, NO_OFFER, highPriority));
+  }
+
+  @Test
+  public void testProductionPreemptingNonproduction() throws Exception {
+    setUpHost();
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    // Use a very low priority for the production task to show that priority is irrelevant.
+    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100);
+    assignToHost(a1);
+
+    expectFiltering();
+
+    control.replay();
+    assertVictims(runFilter(p1, NO_OFFER, a1), a1);
+  }
+
+  @Test
+  public void testProductionPreemptingNonproductionAcrossUsers() throws Exception {
+    setUpHost();
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    // Use a very low priority for the production task to show that priority is irrelevant.
+    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
+    ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100);
+    assignToHost(a1);
+
+    expectFiltering();
+
+    control.replay();
+    assertVictims(runFilter(p1, NO_OFFER, a1), a1);
+  }
+
+  @Test
+  public void testProductionUsersDoNotPreemptEachOther() throws Exception {
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", 1000);
+    ScheduledTask a1 = makeProductionTask(USER_B, JOB_A, TASK_ID_B + "_a1", 0);
+    assignToHost(a1);
+
+    control.replay();
+    assertNoVictims(runFilter(p1, NO_OFFER, a1));
+  }
+
+  // Ensures a production task can preempt 2 tasks on the same host.
+  @Test
+  public void testProductionPreemptingManyNonProduction() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
+    b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    setUpHost();
+
+    assignToHost(a1);
+    assignToHost(b1);
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    control.replay();
+    assertVictims(runFilter(p1, NO_OFFER, a1, b1), a1, b1);
+  }
+
+  // Ensures we select the minimal number of tasks to preempt
+  @Test
+  public void testMinimalSetPreempted() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
+
+    ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
+    b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2");
+    b2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    setUpHost();
+
+    assignToHost(a1);
+    assignToHost(b1);
+    assignToHost(b2);
+
+    ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    control.replay();
+    assertVictims(runFilter(p1, NO_OFFER, b1, b2, a1), a1);
+  }
+
+  // Ensures a production task *never* preempts a production task from another job.
+  @Test
+  public void testProductionJobNeverPreemptsProductionJob() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    setUpHost();
+
+    assignToHost(p1);
+
+    ScheduledTask p2 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p2");
+    p2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    control.replay();
+    assertNoVictims(runFilter(p2, NO_OFFER, p1));
+  }
+
+  // Ensures that we can preempt if a task + offer can satisfy a pending task.
+  @Test
+  public void testPreemptWithOfferAndTask() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+
+    setUpHost();
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a1);
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    control.replay();
+    assertVictims(
+        runFilter(p1, makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1), a1),
+        a1);
+  }
+
+  // Ensures we can preempt if two tasks and an offer can satisfy a pending task.
+  @Test
+  public void testPreemptWithOfferAndMultipleTasks() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+
+    setUpHost();
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a1);
+
+    ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2");
+    a2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a2);
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(4).setRamMb(2048);
+
+    control.replay();
+    Optional<HostOffer> offer =
+        makeOffer(OFFER, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1);
+    assertVictims(runFilter(p1, offer, a1, a2), a1, a2);
+  }
+
+  @Test
+  public void testNoPreemptionVictims() {
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
+
+    control.replay();
+
+    assertNoVictims(runFilter(task, NO_OFFER));
+  }
+
+  @Test
+  public void testMissingAttributes() {
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
+    assignToHost(task);
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a1);
+
+    expect(storageUtil.attributeStore.getHostAttributes(HOST))
+        .andReturn(Optional.<IHostAttributes>absent());
+
+    control.replay();
+
+    assertNoVictims(runFilter(task, NO_OFFER, a1));
+    assertEquals(1L, statsProvider.getLongValue(MISSING_ATTRIBUTES_NAME));
+  }
+
+  @Test
+  public void testAllVictimsVetoed() {
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
+    assignToHost(task);
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a1);
+
+    setUpHost();
+    expectFiltering(Optional.of(Veto.constraintMismatch("ban")));
+
+    control.replay();
+
+    assertNoVictims(runFilter(task, NO_OFFER, a1));
+  }
+
+  private static ImmutableSet<PreemptionVictim> preemptionVictims(ScheduledTask... tasks) {
+    return FluentIterable.from(ImmutableSet.copyOf(tasks))
+        .transform(
+            new Function<ScheduledTask, PreemptionVictim>() {
+              @Override
+              public PreemptionVictim apply(ScheduledTask task) {
+                return PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask()));
+              }
+            }).toSet();
+  }
+
+  private static void assertVictims(
+      Optional<ImmutableSet<PreemptionVictim>> actual,
+      ScheduledTask... expected) {
+
+    assertEquals(Optional.of(preemptionVictims(expected)), actual);
+  }
+
+  private static void assertNoVictims(Optional<ImmutableSet<PreemptionVictim>> actual) {
+    assertEquals(Optional.<ImmutableSet<PreemptionVictim>>absent(), actual);
+  }
+
+  private Optional<HostOffer> makeOffer(
+      String offerId,
+      double cpu,
+      Amount<Long, Data> ram,
+      Amount<Long, Data> disk,
+      int numPorts) {
+
+    List<Resource> resources = new Resources(cpu, ram, disk, numPorts).toResourceList();
+    Offer.Builder builder = Offer.newBuilder();
+    builder.getIdBuilder().setValue(offerId);
+    builder.getFrameworkIdBuilder().setValue("framework-id");
+    builder.getSlaveIdBuilder().setValue(SLAVE_ID);
+    builder.setHostname(HOST);
+    for (Resource r: resources) {
+      builder.addResources(r);
+    }
+    return Optional.of(new HostOffer(
+        builder.build(),
+        IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))));
+  }
+
+  private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering() {
+    return expectFiltering(Optional.<Veto>absent());
+  }
+
+  private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering(
+      final Optional<Veto> veto) {
+
+    return expect(schedulingFilter.filter(
+        EasyMock.<SchedulingFilter.UnusedResource>anyObject(),
+        EasyMock.<SchedulingFilter.ResourceRequest>anyObject()))
+        .andAnswer(
+            new IAnswer<Set<SchedulingFilter.Veto>>() {
+              @Override
+              public Set<SchedulingFilter.Veto> answer() {
+                return veto.asSet();
+              }
+            });
+  }
+
+  static ScheduledTask makeTask(
+      String role,
+      String job,
+      String taskId,
+      int priority,
+      String env,
+      boolean production) {
+
+    AssignedTask assignedTask = new AssignedTask()
+        .setTaskId(taskId)
+        .setTask(new TaskConfig()
+            .setJob(new JobKey(role, env, job))
+            .setPriority(priority)
+            .setProduction(production)
+            .setJobName(job)
+            .setEnvironment(env)
+            .setConstraints(new HashSet<Constraint>()));
+    return new ScheduledTask().setAssignedTask(assignedTask);
+  }
+
+  static ScheduledTask makeTask(String role, String job, String taskId) {
+    return makeTask(role, job, taskId, 0, "dev", false);
+  }
+
+  static void addEvent(ScheduledTask task, ScheduleStatus status) {
+    task.addToTaskEvents(new TaskEvent(0, status));
+  }
+
+  private ScheduledTask makeProductionTask(String role, String job, String taskId) {
+    return makeTask(role, job, taskId, 0, "prod", true);
+  }
+
+  private ScheduledTask makeProductionTask(String role, String job, String taskId, int priority) {
+    return makeTask(role, job, taskId, priority, "prod", true);
+  }
+
+  private ScheduledTask makeTask(String role, String job, String taskId, int priority) {
+    return makeTask(role, job, taskId, priority, "dev", false);
+  }
+
+  private void assignToHost(ScheduledTask task) {
+    task.setStatus(RUNNING);
+    addEvent(task, RUNNING);
+    task.getAssignedTask().setSlaveHost(HOST);
+    task.getAssignedTask().setSlaveId(SLAVE_ID);
+  }
+
+  private Attribute host(String host) {
+    return new Attribute(HOST_ATTRIBUTE, ImmutableSet.of(host));
+  }
+
+  private Attribute rack(String rack) {
+    return new Attribute(RACK_ATTRIBUTE, ImmutableSet.of(rack));
+  }
+
+  // Sets up a normal host, no dedicated hosts and no maintenance.
+  private void setUpHost() {
+    IHostAttributes hostAttrs = IHostAttributes.build(
+        new HostAttributes().setHost(HOST).setSlaveId(HOST + "_id")
+            .setMode(NONE).setAttributes(ImmutableSet.of(rack(RACK), host(RACK))));
+
+    expect(storageUtil.attributeStore.getHostAttributes(HOST))
+        .andReturn(Optional.of(hostAttrs)).anyTimes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
index 64283fa..32d18a9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
@@ -20,12 +20,14 @@ import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
@@ -33,9 +35,11 @@ import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.stats.CachedCounters;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.Protos;
 import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
@@ -52,39 +56,47 @@ import static org.junit.Assert.assertEquals;
 public class PreemptorImplTest extends EasyMockTest {
   private static final String SLAVE_ID = "slave_id";
   private static final IScheduledTask TASK = IScheduledTask.build(makeTask());
-  private static final PreemptionSlot SLOT = createPreemptionSlot(TASK);
+  private static final PreemptionProposal PROPOSAL = createPreemptionProposal(TASK);
   private static final TaskGroupKey GROUP_KEY =
       TaskGroupKey.from(ITaskConfig.build(makeTask().getAssignedTask().getTask()));
 
-  private static final Set<PreemptionSlot> NO_SLOTS = ImmutableSet.of();
+  private static final Set<PreemptionProposal> NO_SLOTS = ImmutableSet.of();
   private static final Optional<String> EMPTY_RESULT = Optional.absent();
+  private static final HostOffer OFFER =
+      new HostOffer(Protos.Offer.getDefaultInstance(), IHostAttributes.build(new HostAttributes()));
 
   private StateManager stateManager;
   private FakeStatsProvider statsProvider;
-  private PreemptionSlotFinder preemptionSlotFinder;
+  private PreemptionVictimFilter preemptionVictimFilter;
   private PreemptorImpl preemptor;
-  private BiCache<PreemptionSlot, TaskGroupKey> slotCache;
+  private BiCache<PreemptionProposal, TaskGroupKey> slotCache;
   private Storage.MutableStoreProvider storeProvider;
 
   @Before
   public void setUp() {
     storeProvider = createMock(Storage.MutableStoreProvider.class);
     stateManager = createMock(StateManager.class);
-    preemptionSlotFinder = createMock(PreemptionSlotFinder.class);
-    slotCache = createMock(new Clazz<BiCache<PreemptionSlot, TaskGroupKey>>() { });
+    preemptionVictimFilter = createMock(PreemptionVictimFilter.class);
+    slotCache = createMock(new Clazz<BiCache<PreemptionProposal, TaskGroupKey>>() { });
     statsProvider = new FakeStatsProvider();
+    OfferManager offerManager = createMock(OfferManager.class);
+    expect(offerManager.getOffer(anyObject(Protos.SlaveID.class)))
+        .andReturn(Optional.of(OFFER))
+        .anyTimes();
+
     preemptor = new PreemptorImpl(
         stateManager,
-        preemptionSlotFinder,
+        offerManager,
+        preemptionVictimFilter,
         new PreemptorMetrics(new CachedCounters(statsProvider)),
         slotCache);
   }
 
   @Test
   public void testPreemptTasksSuccessful() throws Exception {
-    expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(SLOT));
-    slotCache.remove(SLOT, GROUP_KEY);
-    expectSlotValidation(Optional.of(ImmutableSet.of(
+    expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL));
+    slotCache.remove(PROPOSAL, GROUP_KEY);
+    expectSlotValidation(PROPOSAL, Optional.of(ImmutableSet.of(
         PreemptionVictim.fromTask(TASK.getAssignedTask()))));
 
     expectPreempted(TASK);
@@ -98,9 +110,9 @@ public class PreemptorImplTest extends EasyMockTest {
 
   @Test
   public void testPreemptTasksValidationFailed() throws Exception {
-    expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(SLOT));
-    slotCache.remove(SLOT, GROUP_KEY);
-    expectSlotValidation(Optional.<ImmutableSet<PreemptionVictim>>absent());
+    expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL));
+    slotCache.remove(PROPOSAL, GROUP_KEY);
+    expectSlotValidation(PROPOSAL, Optional.<ImmutableSet<PreemptionVictim>>absent());
 
     control.replay();
 
@@ -124,11 +136,15 @@ public class PreemptorImplTest extends EasyMockTest {
     return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), EMPTY, storeProvider);
   }
 
-  private void expectSlotValidation(Optional<ImmutableSet<PreemptionVictim>> victims) {
-    expect(preemptionSlotFinder.validatePreemptionSlotFor(
-        TASK.getAssignedTask(),
+  private void expectSlotValidation(
+      PreemptionProposal slot,
+      Optional<ImmutableSet<PreemptionVictim>> victims) {
+
+    expect(preemptionVictimFilter.filterPreemptionVictims(
+        TASK.getAssignedTask().getTask(),
+        slot.getVictims(),
         EMPTY,
-        SLOT,
+        Optional.of(OFFER),
         storeProvider)).andReturn(victims);
   }
 
@@ -142,9 +158,9 @@ public class PreemptorImplTest extends EasyMockTest {
         .andReturn(true);
   }
 
-  private static PreemptionSlot createPreemptionSlot(IScheduledTask task) {
+  private static PreemptionProposal createPreemptionProposal(IScheduledTask task) {
     IAssignedTask assigned = task.getAssignedTask();
-    return new PreemptionSlot(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID);
+    return new PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID);
   }
 
   private static ScheduledTask makeTask() {


Mime
View raw message