aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject git commit: AURORA-139: When trying to schedule a task, only query once for active tasks in the job.
Date Fri, 31 Jan 2014 03:56:30 GMT
Updated Branches:
  refs/heads/master b86b2fed0 -> f6ef17949


AURORA-139: When trying to schedule a task, only query once for active tasks
in the job.

One functional change in this diff is the removal of a filter in
AttributeFilter, which was unnecessary since the query was already job-scoped.
The filter existed because at one point we supported job configurations that
could request to avoid other jobs, so the task set would include other jobs.

Bugs closed: AURORA-139

Reviewed at https://reviews.apache.org/r/17578/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/f6ef1794
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/f6ef1794
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/f6ef1794

Branch: refs/heads/master
Commit: f6ef17949730cbf31812e20c3be0b34da29626cc
Parents: b86b2fe
Author: Bill Farner <wfarner@apache.org>
Authored: Thu Jan 30 19:54:14 2014 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Thu Jan 30 19:54:14 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/AsyncModule.java     |   6 +-
 .../aurora/scheduler/async/Preemptor.java       |  21 ++-
 .../aurora/scheduler/async/TaskScheduler.java   |  41 ++++-
 .../events/NotifyingSchedulingFilter.java       |  11 +-
 .../scheduler/filter/AttributeFilter.java       |  10 +-
 .../aurora/scheduler/filter/CachedJobState.java |  71 ++++++++
 .../scheduler/filter/ConstraintFilter.java      |  19 +-
 .../scheduler/filter/SchedulingFilter.java      |   8 +-
 .../scheduler/filter/SchedulingFilterImpl.java  |  35 ++--
 .../aurora/scheduler/state/TaskAssigner.java    |  13 +-
 .../scheduler/async/PreemptorImplTest.java      |  12 +-
 .../scheduler/async/TaskSchedulerImplTest.java  | 178 ++++++++++---------
 .../scheduler/async/TaskSchedulerTest.java      |  45 +++--
 .../events/NotifyingSchedulingFilterTest.java   |  14 +-
 .../filter/SchedulingFilterImplTest.java        | 136 ++++++++------
 .../scheduler/state/TaskAssignerImplTest.java   |  15 +-
 16 files changed, 398 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 72d3621..430ff03 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -49,6 +49,7 @@ import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculat
 import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -130,7 +131,10 @@ public class AsyncModule extends AbstractModule {
   private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
 
   private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
-    @Override public Optional<String> findPreemptionSlotFor(String taskId) {
+    @Override public Optional<String> findPreemptionSlotFor(
+        String taskId,
+        CachedJobState cachedJobState) {
+
       return Optional.absent();
     }
   };

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index c11f483..7bc3ebf 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -48,6 +48,7 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -75,9 +76,10 @@ public interface Preemptor {
    * Preempts active tasks in favor of the input task.
    *
    * @param taskId ID of the preempting task.
+   * @param cachedJobState Cached information about the job containing {@code taskId}.
    * @return ID of the slave where preemption occured.
    */
-  Optional<String> findPreemptionSlotFor(String taskId);
+  Optional<String> findPreemptionSlotFor(String taskId, CachedJobState cachedJobState);
 
   /**
    * A task preemptor that tries to find tasks that are waiting to be scheduled, which are of higher
@@ -215,7 +217,8 @@ public interface Preemptor {
     private Optional<Set<IAssignedTask>> getTasksToPreempt(
         Iterable<IAssignedTask> possibleVictims,
         Iterable<Offer> offers,
-        IAssignedTask pendingTask) {
+        IAssignedTask pendingTask,
+        CachedJobState cachedJobState) {
 
       // This enforces the precondition that all of the resources are from the same host. We need to
       // get the host for the schedulingFilter.
@@ -233,7 +236,8 @@ public interface Preemptor {
             slackResources,
             host,
             pendingTask.getTask(),
-            pendingTask.getTaskId());
+            pendingTask.getTaskId(),
+            cachedJobState);
 
         if (vetos.isEmpty()) {
           return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
@@ -262,7 +266,8 @@ public interface Preemptor {
             totalResource,
             host,
             pendingTask.getTask(),
-            pendingTask.getTaskId());
+            pendingTask.getTaskId(),
+            cachedJobState);
 
         if (vetos.isEmpty()) {
           return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
@@ -290,7 +295,10 @@ public interface Preemptor {
     }
 
     @Override
-    public synchronized Optional<String> findPreemptionSlotFor(String taskId) {
+    public synchronized Optional<String> findPreemptionSlotFor(
+        String taskId,
+        CachedJobState cachedJobState) {
+
       List<IAssignedTask> pendingTasks =
           fetch(Query.statusScoped(PENDING).byId(taskId), isIdleTask);
 
@@ -322,7 +330,8 @@ public interface Preemptor {
         Optional<Set<IAssignedTask>> toPreemptTasks = getTasksToPreempt(
             slavesToActiveTasks.get(slaveID),
             slavesToOffers.get(slaveID),
-            pendingTask);
+            pendingTask,
+            cachedJobState);
 
         if (toPreemptTasks.isPresent()) {
           for (IAssignedTask toPreempt : toPreemptTasks.get()) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 4afc332..9c4f691 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -17,6 +17,7 @@ package org.apache.aurora.scheduler.async;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
+import java.util.EnumSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
@@ -27,10 +28,14 @@ import javax.inject.Inject;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.base.Ticker;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.BindingAnnotation;
 import com.twitter.common.inject.TimedInterceptor.Timed;
@@ -40,14 +45,19 @@ import com.twitter.common.stats.StatImpl;
 import com.twitter.common.stats.Stats;
 import com.twitter.common.util.Clock;
 
+import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.SlaveID;
@@ -127,6 +137,7 @@ interface TaskScheduler extends EventSubscriber {
     }
 
     private Function<Offer, Optional<TaskInfo>> getAssignerFunction(
+        final CachedJobState cachedJobState,
         final String taskId,
         final IScheduledTask task) {
 
@@ -136,14 +147,14 @@ interface TaskScheduler extends EventSubscriber {
           if (reservedTaskId.isPresent()) {
             if (taskId.equals(reservedTaskId.get())) {
               // Slave is reserved to satisfy this task.
-              return assigner.maybeAssign(offer, task);
+              return assigner.maybeAssign(offer, task, cachedJobState);
             } else {
               // Slave is reserved for another task.
               return Optional.absent();
             }
           } else {
             // Slave is not reserved.
-            return assigner.maybeAssign(offer, task);
+            return assigner.maybeAssign(offer, task, cachedJobState);
           }
         }
       };
@@ -153,6 +164,22 @@ interface TaskScheduler extends EventSubscriber {
     static final Optional<String> LAUNCH_FAILED_MSG =
         Optional.of("Unknown exception attempting to schedule task.");
 
+    private static final Iterable<ScheduleStatus> ACTIVE_NOT_PENDING_STATES =
+        EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(ScheduleStatus.PENDING)));
+
+    @VisibleForTesting
+    static Query.Builder activeJobStateQuery(IJobKey jobKey) {
+      return Query.jobScoped(jobKey).byStatus(ACTIVE_NOT_PENDING_STATES);
+    }
+
+    private CachedJobState getJobState(final TaskStore store, final IJobKey jobKey) {
+      return new CachedJobState(Suppliers.memoize(new Supplier<ImmutableSet<IScheduledTask>>() {
+        @Override public ImmutableSet<IScheduledTask> get() {
+          return store.fetchTasks(activeJobStateQuery(jobKey));
+        }
+      }));
+    }
+
     @Timed("task_schedule_attempt")
     @Override
     public TaskSchedulerResult schedule(final String taskId) {
@@ -167,10 +194,12 @@ interface TaskScheduler extends EventSubscriber {
             if (task == null) {
               LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
             } else {
+              final CachedJobState cachedJobState =
+                  getJobState(store.getTaskStore(), Tasks.SCHEDULED_TO_JOB_KEY.apply(task));
               try {
-                if (!offerQueue.launchFirst(getAssignerFunction(taskId, task))) {
+                if (!offerQueue.launchFirst(getAssignerFunction(cachedJobState, taskId, task))) {
                   // Task could not be scheduled.
-                  maybePreemptFor(taskId);
+                  maybePreemptFor(taskId, cachedJobState);
                   return TaskSchedulerResult.TRY_AGAIN;
                 }
               } catch (OfferQueue.LaunchException e) {
@@ -198,11 +227,11 @@ interface TaskScheduler extends EventSubscriber {
       }
     }
 
-    private void maybePreemptFor(String taskId) {
+    private void maybePreemptFor(String taskId, CachedJobState cachedJobState) {
       if (reservations.hasReservationForTask(taskId)) {
         return;
       }
-      Optional<String> slaveId = preemptor.findPreemptionSlotFor(taskId);
+      Optional<String> slaveId = preemptor.findPreemptionSlotFor(taskId, cachedJobState);
       if (slaveId.isPresent()) {
         this.reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), taskId);
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index c7f4a1b..d2be7ac 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -25,6 +25,7 @@ import com.google.inject.BindingAnnotation;
 
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
@@ -60,8 +61,14 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
   }
 
   @Override
-  public Set<Veto> filter(ResourceSlot offer, String slaveHost, ITaskConfig task, String taskId) {
-    Set<Veto> vetoes = delegate.filter(offer, slaveHost, task, taskId);
+  public Set<Veto> filter(
+      ResourceSlot offer,
+      String slaveHost,
+      ITaskConfig task,
+      String taskId,
+      CachedJobState jobState) {
+
+    Set<Veto> vetoes = delegate.filter(offer, slaveHost, task, taskId, jobState);
     if (!vetoes.isEmpty()) {
       eventSink.post(new Vetoed(taskId, vetoes));
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
index 0816d3f..93ba8d7 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
@@ -24,9 +24,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
 import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl.AttributeLoader;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.IValueConstraint;
 
@@ -64,21 +62,16 @@ final class AttributeFilter {
    * Tests whether an attribute matches a limit constraint.
    *
    * @param attributes Attributes to match against.
-   * @param jobKey Key of the job with the limited constraint.
    * @param limit Limit value.
    * @param activeTasks All active tasks in the system.
    * @param attributeFetcher Interface for fetching attributes for hosts in the system.
    * @return {@code true} if the limit constraint is satisfied, {@code false} otherwise.
    */
   static boolean matches(final Set<Attribute> attributes,
-      final IJobKey jobKey,
       int limit,
       Iterable<IScheduledTask> activeTasks,
       final AttributeLoader attributeFetcher) {
 
-    Predicate<IScheduledTask> sameJob =
-        Predicates.compose(Predicates.equalTo(jobKey), Tasks.SCHEDULED_TO_JOB_KEY);
-
     Predicate<IScheduledTask> hasAttribute = new Predicate<IScheduledTask>() {
       @Override public boolean apply(IScheduledTask task) {
         Iterable<Attribute> hostAttributes =
@@ -87,7 +80,6 @@ final class AttributeFilter {
       }
     };
 
-    return limit > Iterables.size(
-        Iterables.filter(activeTasks, Predicates.and(sameJob, hasAttribute)));
+    return limit > Iterables.size(Iterables.filter(activeTasks, hasAttribute));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java b/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java
new file mode 100644
index 0000000..337a5e7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.filter;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+/**
+ * A temporary view of a job's state.  Once constructed, instances of this class should be discarded
+ * once the job state may change (e.g. after exiting a write transaction).  This is intended to
+ * capture job state once and avoid redundant queries.
+ * <p>
+ * Note that while the state injected into this class is lazy (to allow for queries to happen only
+ * on-demand), calling {@link #equals(Object)} and {@link #hashCode()} rely on the result of
+ * {@link #getActiveTasks()}, thus invoking the {@link Supplier}.
+ * <p>
+ * TODO(wfarner): Take this caching one step further and don't store tasks here, but instead store
+ * pre-computed aggregates of host attributes.  For example, translate each IScheduledTask into
+ * the HostAttributes of the machine it resides on, and count the number of times each attribute is
+ * seen.  This could be represented in a <pre>Map&lt;String, Multiset&lt;String&gt;&gt;</pre>, where
+ * the outer key is the attribute name, and the inner key is attribute values.  So, for a job with
+ * two tasks on the same rack but different hosts, you could have this aggregate:
+ * <pre>
+ * { "host": {"hostA": 1, "hostB": 1},
+ *   "rack": {"rack1": 2}
+ * }
+ * </pre>
+ */
+public class CachedJobState {
+
+  private final Supplier<ImmutableSet<IScheduledTask>> activeTaskSupplier;
+
+  public CachedJobState(Supplier<ImmutableSet<IScheduledTask>> activeTaskSupplier) {
+    this.activeTaskSupplier = Preconditions.checkNotNull(activeTaskSupplier);
+  }
+
+  public ImmutableSet<IScheduledTask> getActiveTasks() {
+    return activeTaskSupplier.get();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof CachedJobState)) {
+      return false;
+    }
+
+    CachedJobState other = (CachedJobState) o;
+    return getActiveTasks().equals(other.getActiveTasks());
+  }
+
+  @Override
+  public int hashCode() {
+    return getActiveTasks().hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
index 17fc8b9..8c2313c 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
@@ -15,7 +15,6 @@
  */
 package org.apache.aurora.scheduler.filter;
 
-import java.util.Collection;
 import java.util.Set;
 import java.util.logging.Logger;
 
@@ -23,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
@@ -32,8 +30,6 @@ import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl.AttributeLoader;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -45,27 +41,23 @@ class ConstraintFilter implements Function<IConstraint, Optional<Veto>> {
 
   private static final Logger LOG = Logger.getLogger(ConstraintFilter.class.getName());
 
-  private final IJobKey jobKey;
-  private final Supplier<Collection<IScheduledTask>> activeTasksSupplier;
+  private final CachedJobState cachedjobState;
   private final AttributeLoader attributeLoader;
   private final Iterable<Attribute> hostAttributes;
 
   /**
    * Creates a new constraint filer for a given job.
    *
-   * @param jobKey Key for the job.
-   * @param activeTasksSupplier Supplier to fetch active tasks (if necessary).
+   * @param cachedjobState Cached information about the job containing the task being matched.
    * @param attributeLoader Interface to fetch host attributes (if necessary).
    * @param hostAttributes The attributes of the host to test against.
    */
   ConstraintFilter(
-      IJobKey jobKey,
-      Supplier<Collection<IScheduledTask>> activeTasksSupplier,
+      CachedJobState cachedjobState,
       AttributeLoader attributeLoader,
       Iterable<Attribute> hostAttributes) {
 
-    this.jobKey = checkNotNull(jobKey);
-    this.activeTasksSupplier = checkNotNull(activeTasksSupplier);
+    this.cachedjobState = checkNotNull(cachedjobState);
     this.attributeLoader = checkNotNull(attributeLoader);
     this.hostAttributes = checkNotNull(hostAttributes);
   }
@@ -106,9 +98,8 @@ class ConstraintFilter implements Function<IConstraint, Optional<Veto>> {
 
         boolean satisfied = AttributeFilter.matches(
             attributes,
-            jobKey,
             taskConstraint.getLimit().getLimit(),
-            activeTasksSupplier.get(),
+            cachedjobState.getActiveTasks(),
             attributeLoader);
         return satisfied
             ? Optional.<Veto>absent()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index 999e0f7..ddb34cf 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -109,8 +109,14 @@ public interface SchedulingFilter {
    * @param slaveHost Host that the resources are associated with.
    * @param task Task.
    * @param taskId Canonical ID of the task.
+   * @param cachedJobState Cached information about the job containing {@code task}.
    * @return A set of vetoes indicating reasons the task cannot be scheduled.  If the task may be
    *    scheduled, the set will be empty.
    */
-  Set<Veto> filter(ResourceSlot offer, String slaveHost, ITaskConfig task, String taskId);
+  Set<Veto> filter(
+      ResourceSlot offer,
+      String slaveHost,
+      ITaskConfig task,
+      String taskId,
+      CachedJobState cachedJobState);
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index 4ba1483..3420b06 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -15,7 +15,6 @@
  */
 package org.apache.aurora.scheduler.filter;
 
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.Set;
@@ -25,23 +24,17 @@ import javax.inject.Inject;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 
 import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.storage.AttributeStore;
@@ -49,7 +42,6 @@ import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -198,10 +190,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
         }
       });
 
-  private static final Iterable<ScheduleStatus> ACTIVE_NOT_PENDING_STATES =
-      EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(ScheduleStatus.PENDING)));
-
-  private FilterRule getConstraintFilter(final String slaveHost) {
+  private FilterRule getConstraintFilter(final CachedJobState jobState, final String slaveHost) {
     return new FilterRule() {
       @Override public Iterable<Veto> apply(final ITaskConfig task) {
         if (!task.isSetConstraints()) {
@@ -220,18 +209,8 @@ public class SchedulingFilterImpl implements SchedulingFilter {
               }
             };
 
-            Supplier<Collection<IScheduledTask>> activeTasksSupplier =
-                Suppliers.memoize(new Supplier<Collection<IScheduledTask>>() {
-                  @Override public Collection<IScheduledTask> get() {
-                    return storeProvider.getTaskStore().fetchTasks(
-                        Query.jobScoped(Tasks.INFO_TO_JOB_KEY.apply(task))
-                            .byStatus(ACTIVE_NOT_PENDING_STATES));
-                  }
-                });
-
             ConstraintFilter constraintFilter = new ConstraintFilter(
-                Tasks.INFO_TO_JOB_KEY.apply(task),
-                activeTasksSupplier,
+                jobState,
                 attributeLoader,
                 attributeLoader.apply(slaveHost));
             ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
@@ -281,12 +260,18 @@ public class SchedulingFilterImpl implements SchedulingFilter {
   }
 
   @Override
-  public Set<Veto> filter(ResourceSlot offer, String slaveHost, ITaskConfig task, String taskId) {
+  public Set<Veto> filter(
+      ResourceSlot offer,
+      String slaveHost,
+      ITaskConfig task,
+      String taskId,
+      CachedJobState cachedJobState) {
+
     if (!ConfigurationManager.isDedicated(task) && isDedicated(slaveHost)) {
       return ImmutableSet.of(DEDICATED_HOST_VETO);
     }
     return ImmutableSet.<Veto>builder()
-        .addAll(getConstraintFilter(slaveHost).apply(task))
+        .addAll(getConstraintFilter(cachedJobState, slaveHost).apply(task))
         .addAll(getResourceVetoes(offer, task))
         .addAll(getMaintenanceVeto(slaveHost).asSet())
         .build();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index d8f326e..3154ef0 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -26,6 +26,7 @@ import org.apache.aurora.scheduler.MesosTaskFactory;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -46,9 +47,10 @@ public interface TaskAssigner {
    *
    * @param offer The resource offer.
    * @param task The task to match against and optionally assign.
+   * @param cachedJobState Cached information about the job containing {@code task}.
    * @return Instructions for launching the task if matching and assignment were successful.
    */
-  Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task);
+  Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task, CachedJobState cachedJobState);
 
   class TaskAssignerImpl implements TaskAssigner {
     private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
@@ -83,12 +85,17 @@ public interface TaskAssigner {
     }
 
     @Override
-    public Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task) {
+    public Optional<TaskInfo> maybeAssign(
+        Offer offer,
+        IScheduledTask task,
+        CachedJobState cachedJobState) {
+
       Set<Veto> vetoes = filter.filter(
           ResourceSlot.from(offer),
           offer.getHostname(),
           task.getAssignedTask().getTask(),
-          Tasks.id(task));
+          Tasks.id(task),
+          cachedJobState);
       if (vetoes.isEmpty()) {
         return Optional.of(assign(offer, task));
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index 025294f..3384a3a 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Set;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -44,6 +45,7 @@ import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -87,6 +89,9 @@ public class PreemptorImplTest extends EasyMockTest {
 
   private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS);
 
+  private static final CachedJobState EMPTY_JOB =
+      new CachedJobState(Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()));
+
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private SchedulingFilter schedulingFilter;
@@ -113,7 +118,7 @@ public class PreemptorImplTest extends EasyMockTest {
         PREEMPTION_DELAY,
         clock);
 
-    preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId());
+    preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), EMPTY_JOB);
   }
 
   // TODO(zmanji): Put together a SchedulerPreemptorIntegrationTest as well.
@@ -495,9 +500,10 @@ public class PreemptorImplTest extends EasyMockTest {
   private IExpectationSetters<Set<Veto>> expectFiltering() {
     return expect(schedulingFilter.filter(
         EasyMock.<ResourceSlot>anyObject(),
-        EasyMock.<String>anyObject(),
+        EasyMock.eq(HOST_A),
         EasyMock.<ITaskConfig>anyObject(),
-        EasyMock.<String>anyObject())).andAnswer(
+        EasyMock.<String>anyObject(),
+        EasyMock.eq(EMPTY_JOB))).andAnswer(
         new IAnswer<Set<Veto>>() {
           @Override public Set<Veto> answer() {
             return ImmutableSet.of();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 35dd666..4572895 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -17,6 +17,7 @@ package org.apache.aurora.scheduler.async;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
@@ -31,9 +32,12 @@ import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 import org.apache.aurora.scheduler.state.PubsubTestUtil;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
@@ -47,12 +51,21 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerResult.SUCCESS;
+import static org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerResult.TRY_AGAIN;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
 public class TaskSchedulerImplTest extends EasyMockTest {
 
+  private static final IScheduledTask TASK_A = makeTask("a");
+  private static final IScheduledTask TASK_B = makeTask("b");
+  private static final Offer OFFER = Offers.makeOffer("OFFER_A", "HOST_A");
+
+  private static final CachedJobState EMPTY_JOB =
+      new CachedJobState(Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()));
+
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private TaskAssigner assigner;
@@ -93,169 +106,160 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     eventSink = PubsubTestUtil.startPubsub(injector);
   }
 
+  private void expectTaskStillPendingQuery(IScheduledTask task) {
+    storageUtil.expectTaskFetch(
+        Query.taskScoped(Tasks.id(task)).byStatus(PENDING),
+        ImmutableSet.of(task));
+  }
+
+  private void expectAssigned(IScheduledTask task) {
+    expect(assigner.maybeAssign(OFFER, task, EMPTY_JOB))
+        .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
+  }
+
   @Test
   public void testReservationsDeniesTasksForTimePeriod() throws OfferQueue.LaunchException {
-    IScheduledTask taskA = makeTask("a");
-    IScheduledTask taskB = makeTask("b");
-    Offer offerA = Offers.makeOffer("OFFER_A", "HOST_A");
-
     storageUtil.expectOperations();
 
-    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
-    expect(preemptor.findPreemptionSlotFor("a"))
-        .andReturn(Optional.of(offerA.getSlaveId().getValue()));
+    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB))
+        .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
 
-    storageUtil.expectTaskFetch(Query.taskScoped("b").byStatus(PENDING), ImmutableSet.of(taskB));
+    expectTaskStillPendingQuery(TASK_B);
+    expectActiveJobFetch(TASK_B);
     Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(false);
-    expect(preemptor.findPreemptionSlotFor("b")).andReturn(Optional.<String>absent());
+    expect(preemptor.findPreemptionSlotFor("b", EMPTY_JOB)).andReturn(Optional.<String>absent());
 
-    storageUtil.expectTaskFetch(Query.taskScoped("b").byStatus(PENDING), ImmutableSet.of(taskB));
+    expectTaskStillPendingQuery(TASK_B);
+    expectActiveJobFetch(TASK_B);
     Capture<Function<Offer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
-
-    expect(assigner.maybeAssign(offerA, taskB))
-        .andReturn(Optional.<TaskInfo>of(TaskInfo.getDefaultInstance()));
+    expectAssigned(TASK_B);
 
     control.replay();
 
-    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
-    assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
+    assertEquals(TRY_AGAIN, scheduler.schedule("a"));
+    assertEquals(TRY_AGAIN, scheduler.schedule("b"));
 
-    assertEquals(Optional.<TaskInfo>absent(), firstAssignment.getValue().apply(offerA));
+    assertEquals(Optional.<TaskInfo>absent(), firstAssignment.getValue().apply(OFFER));
 
     clock.advance(reservationDuration);
 
-    assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.SUCCESS);
+    assertEquals(SUCCESS, scheduler.schedule("b"));
 
-    assertEquals(secondAssignment.getValue().apply(offerA).isPresent(), true);
+    assertEquals(true, secondAssignment.getValue().apply(OFFER).isPresent());
   }
 
   @Test
   public void testReservationsExpireAfterAccepted() throws OfferQueue.LaunchException {
-    IScheduledTask taskA = makeTask("a");
-    IScheduledTask taskB = makeTask("b");
-    Offer offerA = Offers.makeOffer("OFFER_A", "HOST_A");
-
     storageUtil.expectOperations();
 
-    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
-    expect(preemptor.findPreemptionSlotFor("a"))
-        .andReturn(Optional.of(offerA.getSlaveId().getValue()));
-
-    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB))
+        .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
 
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
     Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
+    expectAssigned(TASK_A);
 
-    expect(assigner.maybeAssign(offerA, taskA))
-        .andReturn(Optional.<TaskInfo>of(TaskInfo.getDefaultInstance()));
-
-    storageUtil.expectTaskFetch(Query.taskScoped("b").byStatus(PENDING), ImmutableSet.of(taskB));
+    expectTaskStillPendingQuery(TASK_B);
+    expectActiveJobFetch(TASK_B);
 
     Capture<Function<Offer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
 
-    expect(assigner.maybeAssign(offerA, taskB))
-        .andReturn(Optional.<TaskInfo>of(TaskInfo.getDefaultInstance()));
+    expect(assigner.maybeAssign(OFFER, TASK_B, EMPTY_JOB))
+        .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
 
     control.replay();
-    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
-    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.SUCCESS);
-    firstAssignment.getValue().apply(offerA);
-    eventSink.post(TaskStateChange.transition(taskA, PENDING));
+    assertEquals(TRY_AGAIN, scheduler.schedule("a"));
+    assertEquals(SUCCESS, scheduler.schedule("a"));
+    firstAssignment.getValue().apply(OFFER);
+    eventSink.post(TaskStateChange.transition(TASK_A, PENDING));
     clock.advance(halfReservationDuration);
-    assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.SUCCESS);
-    secondAssignment.getValue().apply(offerA);
+    assertEquals(SUCCESS, scheduler.schedule("b"));
+    secondAssignment.getValue().apply(OFFER);
   }
 
   @Test
   public void testReservationsAcceptsWithInTimePeriod() throws OfferQueue.LaunchException {
-    IScheduledTask taskA = makeTask("a");
-    Offer offerA = Offers.makeOffer("OFFER_A", "HOST_A");
-
     storageUtil.expectOperations();
-    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
-    expect(preemptor.findPreemptionSlotFor("a"))
-        .andReturn(Optional.of(offerA.getSlaveId().getValue()));
-
-    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB))
+        .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
 
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
     Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
-
-    expect(assigner.maybeAssign(offerA, taskA))
-        .andReturn(Optional.<TaskInfo>of(TaskInfo.getDefaultInstance()));
+    expectAssigned(TASK_A);
 
     control.replay();
-    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
+    assertEquals(TRY_AGAIN, scheduler.schedule("a"));
     clock.advance(halfReservationDuration);
-    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.SUCCESS);
+    assertEquals(SUCCESS, scheduler.schedule("a"));
 
-    firstAssignment.getValue().apply(offerA);
+    firstAssignment.getValue().apply(OFFER);
   }
 
   @Test
   public void testReservationsCancellation() throws OfferQueue.LaunchException {
-    IScheduledTask taskA = makeTask("a");
-    IScheduledTask taskB = makeTask("b");
-    Offer offerA = Offers.makeOffer("OFFER_A", "HOST_A");
-
     storageUtil.expectOperations();
 
-    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
     expectLaunchAttempt(false);
 
     // Reserve "a" with offerA
-    expect(preemptor.findPreemptionSlotFor("a"))
-        .andReturn(Optional.of(offerA.getSlaveId().getValue()));
-
-    storageUtil.expectTaskFetch(Query.taskScoped("b").byStatus(PENDING), ImmutableSet.of(taskB));
+    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB))
+        .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
 
+    expectTaskStillPendingQuery(TASK_B);
+    expectActiveJobFetch(TASK_B);
     Capture<Function<Offer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
-
-    expect(assigner.maybeAssign(offerA, taskB))
-        .andReturn(Optional.<TaskInfo>of(TaskInfo.getDefaultInstance()));
+    expectAssigned(TASK_B);
 
     control.replay();
-    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
+    assertEquals(TRY_AGAIN, scheduler.schedule("a"));
     clock.advance(halfReservationDuration);
     // Task is killed by user before it is scheduled
-    eventSink.post(TaskStateChange.transition(taskA, PENDING));
-    assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.SUCCESS);
-    assignment.getValue().apply(offerA);
+    eventSink.post(TaskStateChange.transition(TASK_A, PENDING));
+    assertEquals(SUCCESS, scheduler.schedule("b"));
+    assignment.getValue().apply(OFFER);
   }
 
   @Test
   public void testReservationsExpire() throws OfferQueue.LaunchException {
-    IScheduledTask taskA = makeTask("a");
-    IScheduledTask taskB = makeTask("b");
-    Offer offer1 = Offers.makeOffer("OFFER_1", "HOST_A");
-
     storageUtil.expectOperations();
 
-    storageUtil.expectTaskFetch(Query.taskScoped("b").byStatus(PENDING), ImmutableSet.of(taskB));
+    expectTaskStillPendingQuery(TASK_B);
+    expectActiveJobFetch(TASK_B);
     expectLaunchAttempt(false);
     // Reserve "b" with offer1
-    expect(preemptor.findPreemptionSlotFor("b"))
-        .andReturn(Optional.of(offer1.getSlaveId().getValue()));
+    expect(preemptor.findPreemptionSlotFor("b", EMPTY_JOB))
+        .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
 
-    storageUtil.expectTaskFetch(Query.taskScoped("a").byStatus(PENDING), ImmutableSet.of(taskA));
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
     Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
-
-    expect(assigner.maybeAssign(offer1, taskA))
-        .andReturn(Optional.<TaskInfo>of(TaskInfo.getDefaultInstance()));
+    expectAssigned(TASK_A);
 
     control.replay();
-    assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
+    assertEquals(TRY_AGAIN, scheduler.schedule("b"));
     // We don't act on the reservation made by b because we want to see timeout behaviour.
     clock.advance(reservationDuration);
-    assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.SUCCESS);
-    firstAssignment.getValue().apply(offer1);
+    assertEquals(SUCCESS, scheduler.schedule("a"));
+    firstAssignment.getValue().apply(OFFER);
   }
 
-  private IScheduledTask makeTask(String taskId) {
+  private static IScheduledTask makeTask(String taskId) {
     return IScheduledTask.build(new ScheduledTask()
         .setAssignedTask(new AssignedTask()
             .setInstanceId(0)
@@ -272,4 +276,10 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         expect(offerQueue.launchFirst(capture(assignment))).andReturn(taskLaunched);
         return assignment;
   }
+
+  private void expectActiveJobFetch(IScheduledTask taskInJob) {
+    storageUtil.expectTaskFetch(
+        TaskSchedulerImpl.activeJobStateQuery(Tasks.SCHEDULED_TO_JOB_KEY.apply(taskInJob)),
+        ImmutableSet.<IScheduledTask>of());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 6c124c5..34e0186 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.RateLimiter;
@@ -47,6 +48,7 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
@@ -90,6 +92,9 @@ public class TaskSchedulerTest extends EasyMockTest {
   private static final Offer OFFER_C = Offers.makeOffer("OFFER_C", "HOST_C");
   private static final Offer OFFER_D = Offers.makeOffer("OFFER_D", "HOST_D");
 
+  private static final CachedJobState EMPTY_JOB =
+      new CachedJobState(Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()));
+
   private Storage storage;
   private MaintenanceController maintenance;
   private StateManager stateManager;
@@ -215,7 +220,7 @@ public class TaskSchedulerTest extends EasyMockTest {
   public void testNoOffers() {
     Capture<Runnable> timeoutCapture = expectTaskGroupBackoff(10);
     expectTaskGroupBackoff(10, 20);
-    expect(preemptor.findPreemptionSlotFor("a")).andReturn(Optional.<String>absent());
+    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB)).andReturn(Optional.<String>absent());
 
     replayAndCreateScheduler();
 
@@ -278,16 +283,16 @@ public class TaskSchedulerTest extends EasyMockTest {
     TaskInfo mesosTask = makeTaskInfo(task);
 
     Capture<Runnable> timeoutCapture = expectTaskGroupBackoff(10);
-    expect(assigner.maybeAssign(OFFER_A, task)).andReturn(Optional.<TaskInfo>absent());
-    expect(preemptor.findPreemptionSlotFor("a")).andReturn(Optional.<String>absent());
+    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB)).andReturn(Optional.<TaskInfo>absent());
+    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB)).andReturn(Optional.<String>absent());
 
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(10, 20);
-    expect(assigner.maybeAssign(OFFER_A, task)).andReturn(Optional.of(mesosTask));
+    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB)).andReturn(Optional.of(mesosTask));
     driver.launchTask(OFFER_A.getId(), mesosTask);
 
     Capture<Runnable> timeoutCapture3 = expectTaskGroupBackoff(10);
     expectTaskGroupBackoff(10, 20);
-    expect(preemptor.findPreemptionSlotFor("b")).andReturn(Optional.<String>absent());
+    expect(preemptor.findPreemptionSlotFor("b", EMPTY_JOB)).andReturn(Optional.<String>absent());
 
     replayAndCreateScheduler();
 
@@ -313,7 +318,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = expectTaskGroupBackoff(10);
     expectAnyMaintenanceCalls();
     expectOfferDeclineIn(10);
-    expect(assigner.maybeAssign(OFFER_A, task)).andReturn(Optional.of(mesosTask));
+    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB)).andReturn(Optional.of(mesosTask));
     driver.launchTask(OFFER_A.getId(), mesosTask);
     expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
     expect(stateManager.changeState(
@@ -342,10 +347,11 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = expectTaskGroupBackoff(10);
     expectAnyMaintenanceCalls();
     expectOfferDeclineIn(10);
-    expect(assigner.maybeAssign(OFFER_A, task)).andThrow(new StorageException("Injected failure."));
+    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB))
+        .andThrow(new StorageException("Injected failure."));
 
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(10, 20);
-    expect(assigner.maybeAssign(OFFER_A, task)).andReturn(Optional.of(mesosTask));
+    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB)).andReturn(Optional.of(mesosTask));
     driver.launchTask(OFFER_A.getId(), mesosTask);
     expectLastCall();
 
@@ -364,12 +370,12 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = expectTaskGroupBackoff(10);
     Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
     expectAnyMaintenanceCalls();
-    expect(assigner.maybeAssign(OFFER_A, task)).andReturn(Optional.<TaskInfo>absent());
+    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB)).andReturn(Optional.<TaskInfo>absent());
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(10, 20);
-    expect(preemptor.findPreemptionSlotFor("a")).andReturn(Optional.<String>absent());
+    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB)).andReturn(Optional.<String>absent());
     driver.declineOffer(OFFER_A.getId());
     expectTaskGroupBackoff(20, 30);
-    expect(preemptor.findPreemptionSlotFor("a")).andReturn(Optional.<String>absent());
+    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB)).andReturn(Optional.<String>absent());
 
     replayAndCreateScheduler();
 
@@ -429,13 +435,13 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
-    expect(assigner.maybeAssign(OFFER_A, taskA)).andReturn(Optional.of(mesosTaskA));
+    expect(assigner.maybeAssign(OFFER_A, taskA, EMPTY_JOB)).andReturn(Optional.of(mesosTaskA));
     driver.launchTask(OFFER_A.getId(), mesosTaskA);
     Capture<Runnable> captureA = expectTaskGroupBackoff(10);
 
     IScheduledTask taskB = makeTask("B", PENDING);
     TaskInfo mesosTaskB = makeTaskInfo(taskB);
-    expect(assigner.maybeAssign(OFFER_B, taskB)).andReturn(Optional.of(mesosTaskB));
+    expect(assigner.maybeAssign(OFFER_B, taskB, EMPTY_JOB)).andReturn(Optional.of(mesosTaskB));
     driver.launchTask(OFFER_B.getId(), mesosTaskB);
     Capture<Runnable> captureB = expectTaskGroupBackoff(10);
 
@@ -464,13 +470,13 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
-    expect(assigner.maybeAssign(OFFER_B, taskA)).andReturn(Optional.of(mesosTaskA));
+    expect(assigner.maybeAssign(OFFER_B, taskA, EMPTY_JOB)).andReturn(Optional.of(mesosTaskA));
     driver.launchTask(OFFER_B.getId(), mesosTaskA);
     Capture<Runnable> captureA = expectTaskGroupBackoff(10);
 
     IScheduledTask taskB = makeTask("B", PENDING);
     TaskInfo mesosTaskB = makeTaskInfo(taskB);
-    expect(assigner.maybeAssign(OFFER_C, taskB)).andReturn(Optional.of(mesosTaskB));
+    expect(assigner.maybeAssign(OFFER_C, taskB, EMPTY_JOB)).andReturn(Optional.of(mesosTaskB));
     driver.launchTask(OFFER_C.getId(), mesosTaskB);
     Capture<Runnable> captureB = expectTaskGroupBackoff(10);
 
@@ -496,7 +502,10 @@ public class TaskSchedulerTest extends EasyMockTest {
   private Capture<IScheduledTask> expectTaskScheduled(IScheduledTask task) {
     TaskInfo mesosTask = makeTaskInfo(task);
     Capture<IScheduledTask> taskScheduled = createCapture();
-    expect(assigner.maybeAssign(EasyMock.<Offer>anyObject(), capture(taskScheduled)))
+    expect(assigner.maybeAssign(
+        EasyMock.<Offer>anyObject(),
+        capture(taskScheduled),
+        EasyMock.eq(EMPTY_JOB)))
         .andReturn(Optional.of(mesosTask));
     driver.launchTask(EasyMock.<OfferID>anyObject(), eq(mesosTask));
     return taskScheduled;
@@ -562,9 +571,9 @@ public class TaskSchedulerTest extends EasyMockTest {
     final IScheduledTask task = makeTask("a", PENDING);
 
     Capture<Runnable> timeoutCapture = expectTaskGroupBackoff(10);
-    expect(assigner.maybeAssign(OFFER_A, task)).andReturn(Optional.<TaskInfo>absent());
+    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB)).andReturn(Optional.<TaskInfo>absent());
     expectTaskGroupBackoff(10, 20);
-    expect(preemptor.findPreemptionSlotFor("a")).andReturn(Optional.<String>absent());
+    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB)).andReturn(Optional.<String>absent());
 
     replayAndCreateScheduler();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index c5379fe..4ac059c 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -17,14 +17,17 @@ package org.apache.aurora.scheduler.events;
 
 import java.util.Set;
 
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,6 +48,9 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
   private static final Veto VETO_1 = new Veto("veto1", 1);
   private static final Veto VETO_2 = new Veto("veto2", 2);
 
+  private static final CachedJobState EMPTY_JOB =
+      new CachedJobState(Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()));
+
   private SchedulingFilter filter;
 
   private EventSink eventSink;
@@ -60,21 +66,21 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
   @Test
   public void testEvents() {
     Set<Veto> vetoes = ImmutableSet.of(VETO_1, VETO_2);
-    expect(delegate.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID)).andReturn(vetoes);
+    expect(delegate.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, EMPTY_JOB)).andReturn(vetoes);
     eventSink.post(new Vetoed(TASK_ID, vetoes));
 
     control.replay();
 
-    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID));
+    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, EMPTY_JOB));
   }
 
   @Test
   public void testNoVetoes() {
     Set<Veto> vetoes = ImmutableSet.of();
-    expect(delegate.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID)).andReturn(vetoes);
+    expect(delegate.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, EMPTY_JOB)).andReturn(vetoes);
 
     control.replay();
 
-    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID));
+    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, EMPTY_JOB));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index f3df73c..2ace24a 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -20,6 +20,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.twitter.common.quantity.Amount;
@@ -39,7 +40,6 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.ValueConstraint;
 import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -47,7 +47,6 @@ import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
-import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.easymock.EasyMock;
@@ -64,7 +63,6 @@ import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVe
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.DISK;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.PORTS;
 import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.RAM;
-import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -103,13 +101,15 @@ public class SchedulingFilterImplTest extends EasyMockTest {
       Amount.of(DEFAULT_DISK, Data.MB),
       0);
 
+  private static final CachedJobState EMPTY_JOB =
+      new CachedJobState(Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()));
+
   private final AtomicLong taskIdCounter = new AtomicLong();
 
   private SchedulingFilter defaultFilter;
   private MaintenanceController maintenance;
   private Storage storage;
   private StoreProvider storeProvider;
-  private TaskStore.Mutable taskStore;
   private AttributeStore.Mutable attributeStore;
 
   @Before
@@ -118,13 +118,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     maintenance = createMock(MaintenanceController.class);
     defaultFilter = new SchedulingFilterImpl(storage, maintenance);
     storeProvider = createMock(StoreProvider.class);
-    taskStore = createMock(TaskStore.Mutable.class);
     attributeStore = createMock(AttributeStore.Mutable.class);
 
     // Link the store provider to the store mocks.
     expectReads();
 
-    expect(storeProvider.getTaskStore()).andReturn(taskStore).anyTimes();
     expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
   }
 
@@ -142,19 +140,17 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   public void testMeetsOffer() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
     expectGetHostMaintenanceStatus(HOST_A).times(2);
-    expectGetTasks().times(2);
 
     control.replay();
 
     assertNoVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK));
-    assertNoVetoes(makeTask(DEFAULT_CPUS - 1, DEFAULT_RAM - 1, DEFAULT_DISK - 1));
+    assertNoVetoes(makeTask(DEFAULT_CPUS - 1, DEFAULT_RAM - 1, DEFAULT_DISK - 1), EMPTY_JOB);
   }
 
   @Test
   public void testSufficientPorts() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
     expectGetHostMaintenanceStatus(HOST_A).times(4);
-    expectGetTasks().times(4);
 
     control.replay();
 
@@ -178,19 +174,18 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         .setRequestedPorts(ImmutableSet.of("one", "two", "three")));
 
     Set<Veto> none = ImmutableSet.of();
-    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, noPortTask, TASK_ID));
-    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, onePortTask, TASK_ID));
-    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, twoPortTask, TASK_ID));
+    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, noPortTask, TASK_ID, EMPTY_JOB));
+    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, onePortTask, TASK_ID, EMPTY_JOB));
+    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, twoPortTask, TASK_ID, EMPTY_JOB));
     assertEquals(
         ImmutableSet.of(PORTS.veto(1)),
-        defaultFilter.filter(twoPorts, HOST_A, threePortTask, TASK_ID));
+        defaultFilter.filter(twoPorts, HOST_A, threePortTask, TASK_ID, EMPTY_JOB));
   }
 
   @Test
   public void testInsufficientResources() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
     expectGetHostMaintenanceStatus(HOST_A).times(4);
-    expectGetTasks().times(4);
 
     control.replay();
 
@@ -215,7 +210,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
 
   @Test
   public void testSharedDedicatedHost() throws Exception {
-    String dedicated1 = "ads/adserver";
+    String dedicated1 = "userA/jobA";
     String dedicated2 = "kestrel/kestrel";
 
     expectGetHostAttributes(HOST_A, dedicated(dedicated1, dedicated2)).anyTimes();
@@ -224,8 +219,8 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     control.replay();
 
     assertNoVetoes(checkConstraint(
-        new Identity().setRole("ads"),
-        "adserver",
+        new Identity().setRole("userA"),
+        "jobA",
         HOST_A,
         DEDICATED_ATTRIBUTE,
         true,
@@ -258,7 +253,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   @Test
   public void testHostScheduledForMaintenance() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-    expectGetTasks();
     expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.SCHEDULED);
 
     control.replay();
@@ -269,7 +263,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   @Test
   public void testHostDrainingForMaintenance() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-    expectGetTasks();
     expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.DRAINING);
 
     control.replay();
@@ -280,7 +273,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   @Test
   public void testHostDrainedForMaintenance() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-    expectGetTasks();
     expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.DRAINED);
 
     control.replay();
@@ -300,7 +292,9 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     Constraint constraint1 = makeConstraint("host", HOST_A);
     Constraint constraint2 = makeConstraint(DEDICATED_ATTRIBUTE, "xxx");
 
-    assertVetoes(makeTask(OWNER_A, JOB_A, constraint1, constraint2), HOST_A,
+    assertVetoes(
+        makeTask(OWNER_A, JOB_A, constraint1, constraint2),
+        HOST_A,
         mismatchVeto(DEDICATED_ATTRIBUTE));
     assertNoVetoes(makeTask(OWNER_B, JOB_B, constraint1, constraint2), HOST_B);
   }
@@ -332,7 +326,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   public void testUnderLimitNoTasks() throws Exception {
     expectGetHostAttributes(HOST_A);
     expectGetHostAttributes(HOST_A, host(HOST_A));
-    expectGetTasks();
     expectGetHostMaintenanceStatus(HOST_A);
 
     control.replay();
@@ -361,31 +354,31 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     expectGetHostMaintenanceStatus(HOST_B).atLeastOnce();
     expectGetHostMaintenanceStatus(HOST_C).atLeastOnce();
 
-    expectGetTasks(
+    CachedJobState stateA = new CachedJobState(Suppliers.ofInstance(ImmutableSet.of(
         makeScheduledTask(OWNER_A, JOB_A, HOST_A),
-        makeScheduledTask(OWNER_B, JOB_A, HOST_A),
-        makeScheduledTask(OWNER_B, JOB_A, HOST_A),
         makeScheduledTask(OWNER_A, JOB_A, HOST_B),
         makeScheduledTask(OWNER_A, JOB_A, HOST_B),
-        makeScheduledTask(OWNER_B, JOB_A, HOST_B),
-        makeScheduledTask(OWNER_A, JOB_A, HOST_C))
-        .atLeastOnce();
+        makeScheduledTask(OWNER_A, JOB_A, HOST_C))));
+    CachedJobState stateB = new CachedJobState(Suppliers.ofInstance(ImmutableSet.of(
+        makeScheduledTask(OWNER_B, JOB_A, HOST_A),
+        makeScheduledTask(OWNER_B, JOB_A, HOST_A),
+        makeScheduledTask(OWNER_B, JOB_A, HOST_B))));
 
     control.replay();
 
-    assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_A);
-    assertVetoes(hostLimitTask(OWNER_A, JOB_A, 1), HOST_B, limitVeto(HOST_ATTRIBUTE));
-    assertVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_B, limitVeto(HOST_ATTRIBUTE));
-    assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 3), HOST_B);
+    assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_A, stateA);
+    assertVetoes(hostLimitTask(OWNER_A, JOB_A, 1), HOST_B, stateA, limitVeto(HOST_ATTRIBUTE));
+    assertVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_B, stateA, limitVeto(HOST_ATTRIBUTE));
+    assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 3), HOST_B, stateA);
 
-    assertVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_B, limitVeto(RACK_ATTRIBUTE));
-    assertVetoes(rackLimitTask(OWNER_B, JOB_A, 3), HOST_B, limitVeto(RACK_ATTRIBUTE));
-    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 4), HOST_B);
+    assertVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_B, stateB, limitVeto(RACK_ATTRIBUTE));
+    assertVetoes(rackLimitTask(OWNER_B, JOB_A, 3), HOST_B, stateB, limitVeto(RACK_ATTRIBUTE));
+    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 4), HOST_B, stateB);
 
-    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 1), HOST_C);
+    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 1), HOST_C, stateB);
 
-    assertVetoes(rackLimitTask(OWNER_A, JOB_A, 1), HOST_C, limitVeto(RACK_ATTRIBUTE));
-    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_C);
+    assertVetoes(rackLimitTask(OWNER_A, JOB_A, 1), HOST_C, stateA, limitVeto(RACK_ATTRIBUTE));
+    assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_C, stateB);
   }
 
   @Test
@@ -440,13 +433,15 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     Constraint zoneConstraint = makeConstraint("zone", "c");
 
     ITaskConfig task = makeTask(OWNER_A, JOB_A, jvmConstraint, zoneConstraint);
-    assertTrue(defaultFilter.filter(DEFAULT_OFFER, HOST_A, task, TASK_ID).isEmpty());
+    assertTrue(defaultFilter.filter(DEFAULT_OFFER, HOST_A, task, TASK_ID, EMPTY_JOB).isEmpty());
 
     Constraint jvmNegated = jvmConstraint.deepCopy();
     jvmNegated.getConstraint().getValue().setNegated(true);
     Constraint zoneNegated = jvmConstraint.deepCopy();
     zoneNegated.getConstraint().getValue().setNegated(true);
-    assertVetoes(makeTask(OWNER_A, JOB_A, jvmNegated, zoneNegated), HOST_A,
+    assertVetoes(
+        makeTask(OWNER_A, JOB_A, jvmNegated, zoneNegated),
+        HOST_A,
         mismatchVeto("jvm"));
   }
 
@@ -467,7 +462,14 @@ public class SchedulingFilterImplTest extends EasyMockTest {
       String value,
       String... vs) {
 
-    return checkConstraint(OWNER_A, JOB_A, host, constraintName, expected, value, vs);
+    return checkConstraint(
+        OWNER_A,
+        JOB_A,
+        host,
+        constraintName,
+        expected,
+        value,
+        vs);
   }
 
   private ITaskConfig checkConstraint(
@@ -479,7 +481,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
       String value,
       String... vs) {
 
-    return checkConstraint(owner, jobName, host, constraintName, expected,
+    return checkConstraint(owner, jobName, EMPTY_JOB, host, constraintName, expected,
         new ValueConstraint(false,
             ImmutableSet.<String>builder().add(value).addAll(Arrays.asList(vs)).build()));
   }
@@ -487,6 +489,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   private ITaskConfig checkConstraint(
       Identity owner,
       String jobName,
+      CachedJobState cachedJobState,
       String host,
       String constraintName,
       boolean expected,
@@ -496,32 +499,58 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     ITaskConfig task = makeTask(owner, jobName, constraint);
     assertEquals(
         expected,
-        defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID).isEmpty());
+        defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID, cachedJobState).isEmpty());
 
     Constraint negated = constraint.deepCopy();
     negated.getConstraint().getValue().setNegated(!value.isNegated());
     ITaskConfig negatedTask = makeTask(owner, jobName, negated);
     assertEquals(
         !expected,
-        defaultFilter.filter(DEFAULT_OFFER, host, negatedTask, TASK_ID).isEmpty());
+        defaultFilter.filter(DEFAULT_OFFER, host, negatedTask, TASK_ID, cachedJobState).isEmpty());
     return task;
   }
 
   private void assertNoVetoes(ITaskConfig task) {
-    assertNoVetoes(task, HOST_A);
+    assertNoVetoes(task, EMPTY_JOB);
+  }
+
+  private void assertNoVetoes(ITaskConfig task, CachedJobState jobState) {
+    assertNoVetoes(task, HOST_A, jobState);
   }
 
   private void assertNoVetoes(ITaskConfig task, String host) {
-    assertVetoes(task, host);
+    assertVetoes(task, host, EMPTY_JOB);
+  }
+
+  private void assertNoVetoes(ITaskConfig task, String host, CachedJobState jobState) {
+    assertVetoes(task, host, jobState);
   }
 
   private void assertVetoes(ITaskConfig task, Veto... vetos) {
-    assertVetoes(task, HOST_A, vetos);
+    assertVetoes(task, EMPTY_JOB, vetos);
   }
 
-  private void assertVetoes(ITaskConfig task, String host, Veto... vetoes) {
-    assertEquals(ImmutableSet.copyOf(vetoes),
-        defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID));
+  private void assertVetoes(ITaskConfig task, CachedJobState jobState, Veto... vetos) {
+    assertVetoes(task, HOST_A, jobState, vetos);
+  }
+
+  private void assertVetoes(
+      ITaskConfig task,
+      String host,
+      Veto... vetoes) {
+
+    assertVetoes(task, host, EMPTY_JOB, vetoes);
+  }
+
+  private void assertVetoes(
+      ITaskConfig task,
+      String host,
+      CachedJobState jobState,
+      Veto... vetoes) {
+
+    assertEquals(
+        ImmutableSet.copyOf(vetoes),
+        defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID, jobState));
   }
 
   private Attribute valueAttribute(String name, String string, String... strings) {
@@ -534,13 +563,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         TaskConstraint.value(new ValueConstraint(false, ImmutableSet.copyOf(values))));
   }
 
-  private IExpectationSetters<ImmutableSet<IScheduledTask>> expectGetTasks(
-      IScheduledTask... tasks) {
-
-    return expect(taskStore.fetchTasks((Query.Builder) anyObject()))
-        .andReturn(ImmutableSet.copyOf(tasks));
-  }
-
   private IExpectationSetters<MaintenanceMode> expectGetHostMaintenanceStatus(String host) {
     return expectGetHostMaintenanceStatus(host, MaintenanceMode.NONE);
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f6ef1794/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index 154ccc4..940dac4 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -16,6 +16,7 @@
 package org.apache.aurora.scheduler.state;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
@@ -26,6 +27,7 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.MesosTaskFactory;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.CachedJobState;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
@@ -73,6 +75,9 @@ public class TaskAssignerImplTest extends EasyMockTest {
       .setSlaveId(OFFER.getSlaveId())
       .build();
 
+  private static final CachedJobState EMPTY_JOB =
+      new CachedJobState(Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()));
+
   private StateManager stateManager;
   private SchedulingFilter filter;
   private MesosTaskFactory taskFactory;
@@ -92,7 +97,8 @@ public class TaskAssignerImplTest extends EasyMockTest {
         ResourceSlot.from(OFFER),
         OFFER.getHostname(),
         TASK.getAssignedTask().getTask(),
-        Tasks.id(TASK)))
+        Tasks.id(TASK),
+        EMPTY_JOB))
         .andReturn(ImmutableSet.<Veto>of());
     expect(stateManager.assignTask(
         Tasks.id(TASK),
@@ -104,7 +110,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertEquals(Optional.of(TASK_INFO), assigner.maybeAssign(OFFER, TASK));
+    assertEquals(Optional.of(TASK_INFO), assigner.maybeAssign(OFFER, TASK, EMPTY_JOB));
   }
 
 
@@ -114,11 +120,12 @@ public class TaskAssignerImplTest extends EasyMockTest {
         ResourceSlot.from(OFFER),
         OFFER.getHostname(),
         TASK.getAssignedTask().getTask(),
-        Tasks.id(TASK)))
+        Tasks.id(TASK),
+        EMPTY_JOB))
         .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
 
     control.replay();
 
-    assertEquals(Optional.<TaskInfo>absent(), assigner.maybeAssign(OFFER, TASK));
+    assertEquals(Optional.<TaskInfo>absent(), assigner.maybeAssign(OFFER, TASK, EMPTY_JOB));
   }
 }


Mime
View raw message