aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject git commit: Compute task host attribute aggregates once when scheduling tasks.
Date Wed, 19 Feb 2014 20:14:46 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master dadb6509a -> cdfac3fc5


Compute task host attribute aggregates once when scheduling tasks.

Reviewed at https://reviews.apache.org/r/18157/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/cdfac3fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/cdfac3fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/cdfac3fc

Branch: refs/heads/master
Commit: cdfac3fc5b789a451e5fb68223e93f2489e5a931
Parents: dadb650
Author: Bill Farner <wfarner@apache.org>
Authored: Wed Feb 19 11:49:29 2014 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Wed Feb 19 11:49:29 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/AsyncModule.java     |   4 +-
 .../aurora/scheduler/async/Preemptor.java       |  16 +-
 .../aurora/scheduler/async/TaskScheduler.java   |  41 +++--
 .../events/NotifyingSchedulingFilter.java       |   4 +-
 .../scheduler/filter/AttributeAggregate.java    | 137 ++++++++++++++
 .../scheduler/filter/AttributeFilter.java       |  33 ++--
 .../aurora/scheduler/filter/CachedJobState.java |  71 --------
 .../scheduler/filter/ConstraintFilter.java      |  47 ++---
 .../scheduler/filter/SchedulingFilter.java      |   4 +-
 .../scheduler/filter/SchedulingFilterImpl.java  |  26 +--
 .../aurora/scheduler/state/TaskAssigner.java    |  13 +-
 .../scheduler/async/PreemptorImplTest.java      |  16 +-
 .../scheduler/async/TaskSchedulerImplTest.java  |  28 +--
 .../scheduler/async/TaskSchedulerTest.java      |  47 ++---
 .../events/NotifyingSchedulingFilterTest.java   |  19 +-
 .../filter/AttributeAggregateTest.java          | 177 +++++++++++++++++++
 .../filter/SchedulingFilterImplTest.java        |  52 +++---
 .../scheduler/state/TaskAssignerImplTest.java   |  18 +-
 18 files changed, 495 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index e7a5a83..a4a049e 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -49,7 +49,7 @@ import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculat
 import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.aurora.scheduler.filter.CachedJobState;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -134,7 +134,7 @@ public class AsyncModule extends AbstractModule {
     @Override
     public Optional<String> findPreemptionSlotFor(
         String taskId,
-        CachedJobState cachedJobState) {
+        AttributeAggregate attributeAggregate) {
 
       return Optional.absent();
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index 7893e55..c3bf8c9 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -48,7 +48,7 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.CachedJobState;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -76,10 +76,10 @@ public interface Preemptor {
    * Preempts active tasks in favor of the input task.
    *
    * @param taskId ID of the preempting task.
-   * @param cachedJobState Cached information about the job containing {@code taskId}.
+   * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
    * @return ID of the slave where preemption occured.
    */
-  Optional<String> findPreemptionSlotFor(String taskId, CachedJobState cachedJobState);
+  Optional<String> findPreemptionSlotFor(String taskId, AttributeAggregate attributeAggregate);
 
   /**
    * A task preemptor that tries to find tasks that are waiting to be scheduled, which are of higher
@@ -225,7 +225,7 @@ public interface Preemptor {
         Iterable<IAssignedTask> possibleVictims,
         Iterable<Offer> offers,
         IAssignedTask pendingTask,
-        CachedJobState cachedJobState) {
+        AttributeAggregate attributeAggregate) {
 
       // This enforces the precondition that all of the resources are from the same host. We need to
       // get the host for the schedulingFilter.
@@ -244,7 +244,7 @@ public interface Preemptor {
             host,
             pendingTask.getTask(),
             pendingTask.getTaskId(),
-            cachedJobState);
+            attributeAggregate);
 
         if (vetos.isEmpty()) {
           return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
@@ -274,7 +274,7 @@ public interface Preemptor {
             host,
             pendingTask.getTask(),
             pendingTask.getTaskId(),
-            cachedJobState);
+            attributeAggregate);
 
         if (vetos.isEmpty()) {
           return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
@@ -305,7 +305,7 @@ public interface Preemptor {
     @Override
     public synchronized Optional<String> findPreemptionSlotFor(
         String taskId,
-        CachedJobState cachedJobState) {
+        AttributeAggregate attributeAggregate) {
 
       List<IAssignedTask> pendingTasks =
           fetch(Query.statusScoped(PENDING).byId(taskId), isIdleTask);
@@ -339,7 +339,7 @@ public interface Preemptor {
             slavesToActiveTasks.get(slaveID),
             slavesToOffers.get(slaveID),
             pendingTask,
-            cachedJobState);
+            attributeAggregate);
 
         if (toPreemptTasks.isPresent()) {
           for (IAssignedTask toPreempt : toPreemptTasks.get()) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 61385c6..263235c 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -47,13 +47,13 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.filter.CachedJobState;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos.Offer;
@@ -134,7 +134,7 @@ interface TaskScheduler extends EventSubscriber {
     }
 
     private Function<Offer, Optional<TaskInfo>> getAssignerFunction(
-        final CachedJobState cachedJobState,
+        final AttributeAggregate attributeAggregate,
         final String taskId,
         final IScheduledTask task) {
 
@@ -145,14 +145,14 @@ interface TaskScheduler extends EventSubscriber {
           if (reservedTaskId.isPresent()) {
             if (taskId.equals(reservedTaskId.get())) {
               // Slave is reserved to satisfy this task.
-              return assigner.maybeAssign(offer, task, cachedJobState);
+              return assigner.maybeAssign(offer, task, attributeAggregate);
             } else {
               // Slave is reserved for another task.
               return Optional.absent();
             }
           } else {
             // Slave is not reserved.
-            return assigner.maybeAssign(offer, task, cachedJobState);
+            return assigner.maybeAssign(offer, task, attributeAggregate);
           }
         }
       };
@@ -167,13 +167,18 @@ interface TaskScheduler extends EventSubscriber {
       return Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES);
     }
 
-    private CachedJobState getJobState(final TaskStore store, final IJobKey jobKey) {
-      return new CachedJobState(Suppliers.memoize(new Supplier<ImmutableSet<IScheduledTask>>() {
-        @Override
-        public ImmutableSet<IScheduledTask> get() {
-          return store.fetchTasks(activeJobStateQuery(jobKey));
-        }
-      }));
+    private AttributeAggregate getJobState(
+        final StoreProvider storeProvider,
+        final IJobKey jobKey) {
+
+      Supplier<ImmutableSet<IScheduledTask>> taskSupplier = Suppliers.memoize(
+          new Supplier<ImmutableSet<IScheduledTask>>() {
+            @Override
+            public ImmutableSet<IScheduledTask> get() {
+              return storeProvider.getTaskStore().fetchTasks(activeJobStateQuery(jobKey));
+            }
+          });
+      return new AttributeAggregate(taskSupplier, storeProvider.getAttributeStore());
     }
 
     @Timed("task_schedule_attempt")
@@ -191,12 +196,12 @@ interface TaskScheduler extends EventSubscriber {
             if (task == null) {
               LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
             } else {
-              final CachedJobState cachedJobState =
-                  getJobState(store.getTaskStore(), Tasks.SCHEDULED_TO_JOB_KEY.apply(task));
+              AttributeAggregate aggregate =
+                  getJobState(store, Tasks.SCHEDULED_TO_JOB_KEY.apply(task));
               try {
-                if (!offerQueue.launchFirst(getAssignerFunction(cachedJobState, taskId, task))) {
+                if (!offerQueue.launchFirst(getAssignerFunction(aggregate, taskId, task))) {
                   // Task could not be scheduled.
-                  maybePreemptFor(taskId, cachedJobState);
+                  maybePreemptFor(taskId, aggregate);
                   return TaskSchedulerResult.TRY_AGAIN;
                 }
               } catch (OfferQueue.LaunchException e) {
@@ -224,11 +229,11 @@ interface TaskScheduler extends EventSubscriber {
       }
     }
 
-    private void maybePreemptFor(String taskId, CachedJobState cachedJobState) {
+    private void maybePreemptFor(String taskId, AttributeAggregate attributeAggregate) {
       if (reservations.hasReservationForTask(taskId)) {
         return;
       }
-      Optional<String> slaveId = preemptor.findPreemptionSlotFor(taskId, cachedJobState);
+      Optional<String> slaveId = preemptor.findPreemptionSlotFor(taskId, attributeAggregate);
       if (slaveId.isPresent()) {
         this.reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), taskId);
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index d2be7ac..e946191 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -25,7 +25,7 @@ import com.google.inject.BindingAnnotation;
 
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
-import org.apache.aurora.scheduler.filter.CachedJobState;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
@@ -66,7 +66,7 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
       String slaveHost,
       ITaskConfig task,
       String taskId,
-      CachedJobState jobState) {
+      AttributeAggregate jobState) {
 
     Set<Veto> vetoes = delegate.filter(offer, slaveHost, task, taskId, jobState);
     if (!vetoes.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
new file mode 100644
index 0000000..b3b4c27
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
@@ -0,0 +1,137 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.filter;
+
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.AtomicLongMap;
+import com.twitter.common.collections.Pair;
+
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A temporary view of a job's state. Once constructed, instances of this class should be discarded
+ * once the job state may change (e.g. after exiting a write transaction). This is intended to
+ * capture job state once and avoid redundant queries.
+ * <p>
+ * Note that while the state injected into this class is used lazily (to allow for queries to happen
+ * only on-demand), calling {@link #equals(Object)} and {@link #hashCode()} rely on the aggregation
+ * result, thus invoking the {@link Supplier} and {@link AttributeStore}.
+ */
+public class AttributeAggregate {
+
+  /**
+   * A lazily-computed mapping from attribute name and value to the count of tasks with that
+   * name/value combination.  See doc for {@link #getNumTasksWithAttribute(String, String)} for
+   * further details.
+   */
+  private final Supplier<Map<Pair<String, String>, Long>> aggregate;
+
+  /**
+   * Creates a new attribute aggregate, which will be computed from the provided external state.
+   *
+   * @param activeTaskSupplier Supplier of active tasks within the aggregated scope.
+   * @param attributeStore Source of host attributes to associate with tasks.
+   */
+  public AttributeAggregate(
+      final Supplier<ImmutableSet<IScheduledTask>> activeTaskSupplier,
+      final AttributeStore attributeStore) {
+
+    checkNotNull(activeTaskSupplier);
+    checkNotNull(attributeStore);
+
+    final Function<IScheduledTask, Iterable<Attribute>> getHostAttributes =
+        new Function<IScheduledTask, Iterable<Attribute>>() {
+          @Override
+          public Iterable<Attribute> apply(IScheduledTask task) {
+            // Note: this assumes we have access to attributes for hosts where all active tasks
+            // reside.
+            String host = checkNotNull(task.getAssignedTask().getSlaveHost());
+            return attributeStore.getHostAttributes(host).get().getAttributes();
+          }
+        };
+
+    aggregate = Suppliers.memoize(new Supplier<Map<Pair<String, String>, Long>>() {
+      @Override
+      public Map<Pair<String, String>, Long> get() {
+        AtomicLongMap<Pair<String, String>> counts = AtomicLongMap.create();
+        Iterable<Attribute> allAttributes =
+            Iterables.concat(Iterables.transform(activeTaskSupplier.get(), getHostAttributes));
+        for (Attribute attribute : allAttributes) {
+          for (String value : attribute.getValues()) {
+            counts.incrementAndGet(Pair.of(attribute.getName(), value));
+          }
+        }
+
+        return ImmutableMap.copyOf(counts.asMap());
+      }
+    });
+  }
+
+  /**
+   * Gets the total number of tasks with a given attribute name and value combination.
+   * <p>
+   * For example, the counts for a group of tasks that are host-diverse but not rack-diverse, you
+   * may have counts like this:
+   * <pre>
+   * {
+   *   ("host", "hostA"): 1
+   *   ("host", "hostB"): 1
+   *   ("rack", "rackA"): 2
+   * }
+   * </pre>
+   *
+   * @param name Name of the attribute.
+   * @param value Value of the attribute.
+   * @return Number of tasks in the job whose hosts have the provided attribute name and value.
+   */
+  public long getNumTasksWithAttribute(String name, String value) {
+    return Optional.fromNullable(aggregate.get().get(Pair.of(name, value)))
+        .or(0L);
+  }
+
+  @VisibleForTesting
+  Map<Pair<String, String>, Long> getAggregates() {
+    return aggregate.get();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof AttributeAggregate)) {
+      return false;
+    }
+
+    AttributeAggregate other = (AttributeAggregate) o;
+    return getAggregates().equals(other.getAggregates());
+  }
+
+  @Override
+  public int hashCode() {
+    return getAggregates().hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
index 3e74135..5f1e431 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
@@ -18,14 +18,12 @@ package org.apache.aurora.scheduler.filter;
 import java.util.Set;
 
 import com.google.common.base.Function;
-import com.google.common.base.Predicate;
+import com.google.common.base.Optional;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
 import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.scheduler.filter.SchedulingFilterImpl.AttributeLoader;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.IValueConstraint;
 
 /**
@@ -48,13 +46,13 @@ final class AttributeFilter {
   /**
    * Tests whether a constraint is satisfied by attributes.
    *
-   * @param attributes Host attributes.
+   * @param attribute Host attribute.
    * @param constraint Constraint to match.
    * @return {@code true} if the attribute satisfies the constraint, {@code false} otherwise.
    */
-  static boolean matches(Set<Attribute> attributes, IValueConstraint constraint) {
+  static boolean matches(Optional<Attribute> attribute, IValueConstraint constraint) {
     Set<String> allAttributes =
-        ImmutableSet.copyOf(Iterables.concat(Iterables.transform(attributes, GET_VALUES)));
+        ImmutableSet.copyOf(attribute.transform(GET_VALUES).or(ImmutableSet.<String>of()));
     boolean match = Iterables.any(constraint.getValues(), Predicates.in(allAttributes));
     return constraint.isNegated() ^ match;
   }
@@ -62,26 +60,21 @@ final class AttributeFilter {
   /**
    * Tests whether an attribute matches a limit constraint.
    *
-   * @param attributes Attributes to match against.
+   * @param attribute Attribute to match against.
    * @param limit Limit value.
-   * @param activeTasks All active tasks in the system.
-   * @param attributeFetcher Interface for fetching attributes for hosts in the system.
+   * @param attributeAggregate Cached state of the job being filtered.
    * @return {@code true} if the limit constraint is satisfied, {@code false} otherwise.
    */
-  static boolean matches(final Set<Attribute> attributes,
+  static boolean matches(final Attribute attribute,
       int limit,
-      Iterable<IScheduledTask> activeTasks,
-      final AttributeLoader attributeFetcher) {
+      AttributeAggregate attributeAggregate) {
 
-    Predicate<IScheduledTask> hasAttribute = new Predicate<IScheduledTask>() {
-      @Override
-      public boolean apply(IScheduledTask task) {
-        Iterable<Attribute> hostAttributes =
-            attributeFetcher.apply(task.getAssignedTask().getSlaveHost());
-        return Iterables.any(hostAttributes, Predicates.in(attributes));
+    for (String value : attribute.getValues()) {
+      if (limit <= attributeAggregate.getNumTasksWithAttribute(attribute.getName(), value)) {
+        return false;
       }
-    };
+    }
 
-    return limit > Iterables.size(Iterables.filter(activeTasks, hasAttribute));
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java b/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java
deleted file mode 100644
index 337a5e7..0000000
--- a/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Copyright 2014 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.filter;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-
-/**
- * A temporary view of a job's state.  Once constructed, instances of this class should be discarded
- * once the job state may change (e.g. after exiting a write transaction).  This is intended to
- * capture job state once and avoid redundant queries.
- * <p>
- * Note that while the state injected into this class is lazy (to allow for queries to happen only
- * on-demand), calling {@link #equals(Object)} and {@link #hashCode()} rely on the result of
- * {@link #getActiveTasks()}, thus invoking the {@link Supplier}.
- * <p>
- * TODO(wfarner): Take this caching one step further and don't store tasks here, but instead store
- * pre-computed aggregates of host attributes.  For example, translate each IScheduledTask into
- * the HostAttributes of the machine it resides on, and count the number of times each attribute is
- * seen.  This could be represented in a <pre>Map&lt;String, Multiset&lt;String&gt;&gt;</pre>, where
- * the outer key is the attribute name, and the inner key is attribute values.  So, for a job with
- * two tasks on the same rack but different hosts, you could have this aggregate:
- * <pre>
- * { "host": {"hostA": 1, "hostB": 1},
- *   "rack": {"rack1": 2}
- * }
- * </pre>
- */
-public class CachedJobState {
-
-  private final Supplier<ImmutableSet<IScheduledTask>> activeTaskSupplier;
-
-  public CachedJobState(Supplier<ImmutableSet<IScheduledTask>> activeTaskSupplier) {
-    this.activeTaskSupplier = Preconditions.checkNotNull(activeTaskSupplier);
-  }
-
-  public ImmutableSet<IScheduledTask> getActiveTasks() {
-    return activeTaskSupplier.get();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (!(o instanceof CachedJobState)) {
-      return false;
-    }
-
-    CachedJobState other = (CachedJobState) o;
-    return getActiveTasks().equals(other.getActiveTasks());
-  }
-
-  @Override
-  public int hashCode() {
-    return getActiveTasks().hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
index c2b9f04..2360b1a 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
@@ -15,20 +15,14 @@
  */
 package org.apache.aurora.scheduler.filter;
 
-import java.util.Set;
-import java.util.logging.Logger;
-
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
 import org.apache.aurora.gen.Attribute;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.filter.SchedulingFilterImpl.AttributeLoader;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
 import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
 
@@ -37,28 +31,18 @@ import static com.google.common.base.Preconditions.checkNotNull;
 /**
  * Filter that determines whether a task's constraints are satisfied.
  */
-class ConstraintFilter implements Function<IConstraint, Optional<Veto>> {
-
-  private static final Logger LOG = Logger.getLogger(ConstraintFilter.class.getName());
-
-  private final CachedJobState cachedjobState;
-  private final AttributeLoader attributeLoader;
+class ConstraintFilter {
+  private final AttributeAggregate cachedjobState;
   private final Iterable<Attribute> hostAttributes;
 
   /**
    * Creates a new constraint filer for a given job.
    *
    * @param cachedjobState Cached information about the job containing the task being matched.
-   * @param attributeLoader Interface to fetch host attributes (if necessary).
    * @param hostAttributes The attributes of the host to test against.
    */
-  ConstraintFilter(
-      CachedJobState cachedjobState,
-      AttributeLoader attributeLoader,
-      Iterable<Attribute> hostAttributes) {
-
+  ConstraintFilter(AttributeAggregate cachedjobState, Iterable<Attribute> hostAttributes) {
     this.cachedjobState = checkNotNull(cachedjobState);
-    this.attributeLoader = checkNotNull(attributeLoader);
     this.hostAttributes = checkNotNull(hostAttributes);
   }
 
@@ -77,36 +61,41 @@ class ConstraintFilter implements Function<IConstraint, Optional<Veto>> {
     return new Veto("Host " + reason + " for maintenance", Veto.MAX_SCORE);
   }
 
-  @Override
-  public Optional<Veto> apply(IConstraint constraint) {
-    Set<Attribute> attributes =
-        ImmutableSet.copyOf(Iterables.filter(hostAttributes, new NameFilter(constraint.getName())));
+  /**
+   * Gets the veto (if any) for a scheduling constraint based on the {@link AttributeAggregate} this
+   * filter was created with.
+   *
+   * @param constraint Scheduling filter to check.
+   * @return A veto if the constraint is not satisfied based on the existing state of the job.
+   */
+  Optional<Veto> getVeto(IConstraint constraint) {
+    // Per TODO in api.thrift, we expect host attributes to have unique names.
+    Optional<Attribute> attribute = Optional.fromNullable(Iterables.getOnlyElement(
+        Iterables.filter(hostAttributes, new NameFilter(constraint.getName())), null));
 
     ITaskConstraint taskConstraint = constraint.getConstraint();
     switch (taskConstraint.getSetField()) {
       case VALUE:
         boolean matches =
-            AttributeFilter.matches(attributes, taskConstraint.getValue());
+            AttributeFilter.matches(attribute, taskConstraint.getValue());
         return matches
             ? Optional.<Veto>absent()
             : Optional.of(mismatchVeto(constraint.getName()));
 
       case LIMIT:
-        if (attributes.isEmpty()) {
+        if (!attribute.isPresent()) {
           return Optional.of(mismatchVeto(constraint.getName()));
         }
 
         boolean satisfied = AttributeFilter.matches(
-            attributes,
+            attribute.get(),
             taskConstraint.getLimit().getLimit(),
-            cachedjobState.getActiveTasks(),
-            attributeLoader);
+            cachedjobState);
         return satisfied
             ? Optional.<Veto>absent()
             : Optional.of(limitVeto(constraint.getName()));
 
       default:
-        LOG.warning("Unrecognized constraint type: " + taskConstraint.getSetField());
         throw new SchedulerException("Failed to recognize the constraint type: "
             + taskConstraint.getSetField());
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index ddb34cf..f428be4 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -109,7 +109,7 @@ public interface SchedulingFilter {
    * @param slaveHost Host that the resources are associated with.
    * @param task Task.
    * @param taskId Canonical ID of the task.
-   * @param cachedJobState Cached information about the job containing {@code task}.
+   * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
    * @return A set of vetoes indicating reasons the task cannot be scheduled.  If the task may be
    *    scheduled, the set will be empty.
    */
@@ -118,5 +118,5 @@ public interface SchedulingFilter {
       String slaveHost,
       ITaskConfig task,
       String taskId,
-      CachedJobState cachedJobState);
+      AttributeAggregate attributeAggregate);
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index 93355c3..7c6e9ba 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -84,11 +84,6 @@ public class SchedulingFilterImpl implements SchedulingFilter {
   }
 
   /**
-   * A function that fetches attributes associated with a given host.
-   */
-  public interface AttributeLoader extends Function<String, Iterable<Attribute>> { }
-
-  /**
    * A function that may veto a task.
    */
   private interface FilterRule extends Function<ITaskConfig, Iterable<Veto>> { }
@@ -196,7 +191,10 @@ public class SchedulingFilterImpl implements SchedulingFilter {
         }
       });
 
-  private FilterRule getConstraintFilter(final CachedJobState jobState, final String slaveHost) {
+  private FilterRule getConstraintFilter(
+      final AttributeAggregate jobState,
+      final String slaveHost) {
+
     return new FilterRule() {
       @Override
       public Iterable<Veto> apply(final ITaskConfig task) {
@@ -211,20 +209,12 @@ public class SchedulingFilterImpl implements SchedulingFilter {
         return storage.weaklyConsistentRead(new Quiet<Iterable<Veto>>() {
           @Override
           public Iterable<Veto> apply(final StoreProvider storeProvider) {
-            AttributeLoader attributeLoader = new AttributeLoader() {
-              @Override
-              public Iterable<Attribute> apply(String host) {
-                return AttributeStore.Util.attributesOrNone(storeProvider, host);
-              }
-            };
-
             ConstraintFilter constraintFilter = new ConstraintFilter(
                 jobState,
-                attributeLoader,
-                attributeLoader.apply(slaveHost));
+                AttributeStore.Util.attributesOrNone(storeProvider, slaveHost));
             ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
             for (IConstraint constraint : VALUES_FIRST.sortedCopy(task.getConstraints())) {
-              Optional<Veto> veto = constraintFilter.apply(constraint);
+              Optional<Veto> veto = constraintFilter.getVeto(constraint);
               if (veto.isPresent()) {
                 vetoes.add(veto.get());
                 if (isValueConstraint(constraint)) {
@@ -275,13 +265,13 @@ public class SchedulingFilterImpl implements SchedulingFilter {
       String slaveHost,
       ITaskConfig task,
       String taskId,
-      CachedJobState cachedJobState) {
+      AttributeAggregate attributeAggregate) {
 
     if (!ConfigurationManager.isDedicated(task) && isDedicated(slaveHost)) {
       return ImmutableSet.of(DEDICATED_HOST_VETO);
     }
     return ImmutableSet.<Veto>builder()
-        .addAll(getConstraintFilter(cachedJobState, slaveHost).apply(task))
+        .addAll(getConstraintFilter(attributeAggregate, slaveHost).apply(task))
         .addAll(getResourceVetoes(offer, task))
         .addAll(getMaintenanceVeto(slaveHost).asSet())
         .build();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 3154ef0..7814315 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -26,7 +26,7 @@ import org.apache.aurora.scheduler.MesosTaskFactory;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.filter.CachedJobState;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -47,10 +47,13 @@ public interface TaskAssigner {
    *
    * @param offer The resource offer.
    * @param task The task to match against and optionally assign.
-   * @param cachedJobState Cached information about the job containing {@code task}.
+   * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
    * @return Instructions for launching the task if matching and assignment were successful.
    */
-  Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task, CachedJobState cachedJobState);
+  Optional<TaskInfo> maybeAssign(
+      Offer offer,
+      IScheduledTask task,
+      AttributeAggregate attributeAggregate);
 
   class TaskAssignerImpl implements TaskAssigner {
     private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
@@ -88,14 +91,14 @@ public interface TaskAssigner {
     public Optional<TaskInfo> maybeAssign(
         Offer offer,
         IScheduledTask task,
-        CachedJobState cachedJobState) {
+        AttributeAggregate attributeAggregate) {
 
       Set<Veto> vetoes = filter.filter(
           ResourceSlot.from(offer),
           offer.getHostname(),
           task.getAssignedTask().getTask(),
           Tasks.id(task),
-          cachedJobState);
+          attributeAggregate);
       if (vetoes.isEmpty()) {
         return Optional.of(assign(offer, task));
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index dc371e0..24a35bf 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -45,11 +45,12 @@ import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.filter.CachedJobState;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -95,15 +96,13 @@ public class PreemptorImplTest extends EasyMockTest {
 
   private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS);
 
-  private static final CachedJobState EMPTY_JOB =
-      new CachedJobState(Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()));
-
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private SchedulingFilter schedulingFilter;
   private FakeClock clock;
   private MaintenanceController maintenance;
   private OfferQueue offerQueue;
+  private AttributeAggregate emptyJob;
 
   @Before
   public void setUp() {
@@ -113,6 +112,9 @@ public class PreemptorImplTest extends EasyMockTest {
     maintenance = createMock(MaintenanceController.class);
     clock = new FakeClock();
     offerQueue = createMock(OfferQueue.class);
+    emptyJob = new AttributeAggregate(
+        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
+        createMock(AttributeStore.class));
   }
 
   private void runPreemptor(ScheduledTask pendingTask) {
@@ -124,7 +126,7 @@ public class PreemptorImplTest extends EasyMockTest {
         PREEMPTION_DELAY,
         clock);
 
-    preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), EMPTY_JOB);
+    preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), emptyJob);
   }
 
   // TODO(zmanji): Put together a SchedulerPreemptorIntegrationTest as well.
@@ -511,7 +513,7 @@ public class PreemptorImplTest extends EasyMockTest {
 
     assertEquals(
         Optional.<String>absent(),
-        preemptor.findPreemptionSlotFor(pending.getAssignedTask().getTaskId(), EMPTY_JOB));
+        preemptor.findPreemptionSlotFor(pending.getAssignedTask().getTaskId(), emptyJob));
   }
 
   // TODO(zmanji) spread tasks across slave ids on the same host and see if preemption fails.
@@ -548,7 +550,7 @@ public class PreemptorImplTest extends EasyMockTest {
         EasyMock.eq(HOST_A),
         EasyMock.<ITaskConfig>anyObject(),
         EasyMock.<String>anyObject(),
-        EasyMock.eq(EMPTY_JOB))).andAnswer(
+        EasyMock.eq(emptyJob))).andAnswer(
         new IAnswer<Set<Veto>>() {
           @Override
           public Set<Veto> answer() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 9464783..65e00f7 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -37,10 +37,11 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.filter.CachedJobState;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.state.PubsubTestUtil;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
+import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -67,9 +68,6 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   private static final IScheduledTask TASK_B = makeTask("b");
   private static final Offer OFFER = Offers.makeOffer("OFFER_A", "HOST_A");
 
-  private static final CachedJobState EMPTY_JOB =
-      new CachedJobState(Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()));
-
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private TaskAssigner assigner;
@@ -80,6 +78,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   private Amount<Long, Time> reservationDuration;
   private Amount<Long, Time> halfReservationDuration;
   private EventSink eventSink;
+  private AttributeAggregate emptyJob;
 
   @Before
   public void setUp() throws Exception {
@@ -96,6 +95,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     Injector injector = getInjector(storageUtil.storage);
     scheduler = injector.getInstance(TaskScheduler.class);
     eventSink = PubsubTestUtil.startPubsub(injector);
+    emptyJob = new AttributeAggregate(
+        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
+        createMock(AttributeStore.class));
   }
 
   private Injector getInjector(final Storage storageImpl) {
@@ -121,7 +123,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   private void expectAssigned(IScheduledTask task) {
-    expect(assigner.maybeAssign(OFFER, task, EMPTY_JOB))
+    expect(assigner.maybeAssign(OFFER, task, emptyJob))
         .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
   }
 
@@ -133,13 +135,13 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectActiveJobFetch(TASK_A);
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
-    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB))
+    expect(preemptor.findPreemptionSlotFor("a", emptyJob))
         .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
 
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
     Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(false);
-    expect(preemptor.findPreemptionSlotFor("b", EMPTY_JOB)).andReturn(Optional.<String>absent());
+    expect(preemptor.findPreemptionSlotFor("b", emptyJob)).andReturn(Optional.<String>absent());
 
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
@@ -168,7 +170,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectActiveJobFetch(TASK_A);
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
-    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB))
+    expect(preemptor.findPreemptionSlotFor("a", emptyJob))
         .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
 
     expectTaskStillPendingQuery(TASK_A);
@@ -181,7 +183,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     Capture<Function<Offer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
 
-    expect(assigner.maybeAssign(OFFER, TASK_B, EMPTY_JOB))
+    expect(assigner.maybeAssign(OFFER, TASK_B, emptyJob))
         .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
 
     control.replay();
@@ -201,7 +203,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectActiveJobFetch(TASK_A);
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
-    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB))
+    expect(preemptor.findPreemptionSlotFor("a", emptyJob))
         .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
 
     expectTaskStillPendingQuery(TASK_A);
@@ -226,7 +228,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectLaunchAttempt(false);
 
     // Reserve "a" with offerA
-    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB))
+    expect(preemptor.findPreemptionSlotFor("a", emptyJob))
         .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
 
     expectTaskStillPendingQuery(TASK_B);
@@ -251,7 +253,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectActiveJobFetch(TASK_B);
     expectLaunchAttempt(false);
     // Reserve "b" with offer1
-    expect(preemptor.findPreemptionSlotFor("b", EMPTY_JOB))
+    expect(preemptor.findPreemptionSlotFor("b", emptyJob))
         .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
 
     expectTaskStillPendingQuery(TASK_A);
@@ -291,7 +293,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     });
 
     Capture<Function<Offer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
-    expect(assigner.maybeAssign(OFFER, taskA, EMPTY_JOB))
+    expect(assigner.maybeAssign(OFFER, taskA, emptyJob))
         .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
 
     control.replay();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 4112d61..ce03abc 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -48,10 +48,11 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import org.apache.aurora.scheduler.filter.CachedJobState;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
+import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -92,10 +93,8 @@ public class TaskSchedulerTest extends EasyMockTest {
   private static final Offer OFFER_C = Offers.makeOffer("OFFER_C", "HOST_C");
   private static final Offer OFFER_D = Offers.makeOffer("OFFER_D", "HOST_D");
 
-  private static final CachedJobState EMPTY_JOB =
-      new CachedJobState(Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()));
-
   private Storage storage;
+
   private MaintenanceController maintenance;
   private StateManager stateManager;
   private TaskAssigner assigner;
@@ -109,6 +108,7 @@ public class TaskSchedulerTest extends EasyMockTest {
   private FakeClock clock;
   private BackoffStrategy flappingStrategy;
   private Preemptor preemptor;
+  private AttributeAggregate emptyJob;
   private Amount<Long, Time> reservationDuration = Amount.of(1L, Time.MINUTES);
 
   @Before
@@ -126,6 +126,9 @@ public class TaskSchedulerTest extends EasyMockTest {
     clock.setNowMillis(0);
     flappingStrategy = createMock(BackoffStrategy.class);
     preemptor = createMock(Preemptor.class);
+    emptyJob = new AttributeAggregate(
+        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
+        createMock(AttributeStore.class));
   }
 
   private void replayAndCreateScheduler() {
@@ -221,7 +224,7 @@ public class TaskSchedulerTest extends EasyMockTest {
   public void testNoOffers() {
     Capture<Runnable> timeoutCapture = expectTaskGroupBackoff(10);
     expectTaskGroupBackoff(10, 20);
-    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB)).andReturn(Optional.<String>absent());
+    expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
 
     replayAndCreateScheduler();
 
@@ -285,16 +288,16 @@ public class TaskSchedulerTest extends EasyMockTest {
     TaskInfo mesosTask = makeTaskInfo(task);
 
     Capture<Runnable> timeoutCapture = expectTaskGroupBackoff(10);
-    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB)).andReturn(Optional.<TaskInfo>absent());
-    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB)).andReturn(Optional.<String>absent());
+    expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.<TaskInfo>absent());
+    expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
 
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(10, 20);
-    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB)).andReturn(Optional.of(mesosTask));
+    expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.of(mesosTask));
     driver.launchTask(OFFER_A.getId(), mesosTask);
 
     Capture<Runnable> timeoutCapture3 = expectTaskGroupBackoff(10);
     expectTaskGroupBackoff(10, 20);
-    expect(preemptor.findPreemptionSlotFor("b", EMPTY_JOB)).andReturn(Optional.<String>absent());
+    expect(preemptor.findPreemptionSlotFor("b", emptyJob)).andReturn(Optional.<String>absent());
 
     replayAndCreateScheduler();
 
@@ -320,7 +323,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = expectTaskGroupBackoff(10);
     expectAnyMaintenanceCalls();
     expectOfferDeclineIn(10);
-    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB)).andReturn(Optional.of(mesosTask));
+    expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.of(mesosTask));
     driver.launchTask(OFFER_A.getId(), mesosTask);
     expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
     expect(stateManager.changeState(
@@ -349,11 +352,11 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = expectTaskGroupBackoff(10);
     expectAnyMaintenanceCalls();
     expectOfferDeclineIn(10);
-    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB))
+    expect(assigner.maybeAssign(OFFER_A, task, emptyJob))
         .andThrow(new StorageException("Injected failure."));
 
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(10, 20);
-    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB)).andReturn(Optional.of(mesosTask));
+    expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.of(mesosTask));
     driver.launchTask(OFFER_A.getId(), mesosTask);
     expectLastCall();
 
@@ -372,12 +375,12 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = expectTaskGroupBackoff(10);
     Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
     expectAnyMaintenanceCalls();
-    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB)).andReturn(Optional.<TaskInfo>absent());
+    expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.<TaskInfo>absent());
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(10, 20);
-    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB)).andReturn(Optional.<String>absent());
+    expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
     driver.declineOffer(OFFER_A.getId());
     expectTaskGroupBackoff(20, 30);
-    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB)).andReturn(Optional.<String>absent());
+    expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
 
     replayAndCreateScheduler();
 
@@ -437,13 +440,13 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
-    expect(assigner.maybeAssign(OFFER_A, taskA, EMPTY_JOB)).andReturn(Optional.of(mesosTaskA));
+    expect(assigner.maybeAssign(OFFER_A, taskA, emptyJob)).andReturn(Optional.of(mesosTaskA));
     driver.launchTask(OFFER_A.getId(), mesosTaskA);
     Capture<Runnable> captureA = expectTaskGroupBackoff(10);
 
     IScheduledTask taskB = makeTask("B", PENDING);
     TaskInfo mesosTaskB = makeTaskInfo(taskB);
-    expect(assigner.maybeAssign(OFFER_B, taskB, EMPTY_JOB)).andReturn(Optional.of(mesosTaskB));
+    expect(assigner.maybeAssign(OFFER_B, taskB, emptyJob)).andReturn(Optional.of(mesosTaskB));
     driver.launchTask(OFFER_B.getId(), mesosTaskB);
     Capture<Runnable> captureB = expectTaskGroupBackoff(10);
 
@@ -472,13 +475,13 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
-    expect(assigner.maybeAssign(OFFER_B, taskA, EMPTY_JOB)).andReturn(Optional.of(mesosTaskA));
+    expect(assigner.maybeAssign(OFFER_B, taskA, emptyJob)).andReturn(Optional.of(mesosTaskA));
     driver.launchTask(OFFER_B.getId(), mesosTaskA);
     Capture<Runnable> captureA = expectTaskGroupBackoff(10);
 
     IScheduledTask taskB = makeTask("B", PENDING);
     TaskInfo mesosTaskB = makeTaskInfo(taskB);
-    expect(assigner.maybeAssign(OFFER_C, taskB, EMPTY_JOB)).andReturn(Optional.of(mesosTaskB));
+    expect(assigner.maybeAssign(OFFER_C, taskB, emptyJob)).andReturn(Optional.of(mesosTaskB));
     driver.launchTask(OFFER_C.getId(), mesosTaskB);
     Capture<Runnable> captureB = expectTaskGroupBackoff(10);
 
@@ -507,7 +510,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     expect(assigner.maybeAssign(
         EasyMock.<Offer>anyObject(),
         capture(taskScheduled),
-        EasyMock.eq(EMPTY_JOB)))
+        EasyMock.eq(emptyJob)))
         .andReturn(Optional.of(mesosTask));
     driver.launchTask(EasyMock.<OfferID>anyObject(), eq(mesosTask));
     return taskScheduled;
@@ -573,9 +576,9 @@ public class TaskSchedulerTest extends EasyMockTest {
     final IScheduledTask task = makeTask("a", PENDING);
 
     Capture<Runnable> timeoutCapture = expectTaskGroupBackoff(10);
-    expect(assigner.maybeAssign(OFFER_A, task, EMPTY_JOB)).andReturn(Optional.<TaskInfo>absent());
+    expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.<TaskInfo>absent());
     expectTaskGroupBackoff(10, 20);
-    expect(preemptor.findPreemptionSlotFor("a", EMPTY_JOB)).andReturn(Optional.<String>absent());
+    expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
 
     replayAndCreateScheduler();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index 4ac059c..807fe85 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -24,9 +24,10 @@ import com.twitter.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
-import org.apache.aurora.scheduler.filter.CachedJobState;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.junit.Before;
@@ -48,39 +49,39 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
   private static final Veto VETO_1 = new Veto("veto1", 1);
   private static final Veto VETO_2 = new Veto("veto2", 2);
 
-  private static final CachedJobState EMPTY_JOB =
-      new CachedJobState(Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()));
-
   private SchedulingFilter filter;
-
   private EventSink eventSink;
   private SchedulingFilter delegate;
+  private AttributeAggregate emptyJob;
 
   @Before
   public void setUp() {
     delegate = createMock(SchedulingFilter.class);
     eventSink = createMock(EventSink.class);
     filter = new NotifyingSchedulingFilter(delegate, eventSink);
+    emptyJob = new AttributeAggregate(
+        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
+        createMock(AttributeStore.class));
   }
 
   @Test
   public void testEvents() {
     Set<Veto> vetoes = ImmutableSet.of(VETO_1, VETO_2);
-    expect(delegate.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, EMPTY_JOB)).andReturn(vetoes);
+    expect(delegate.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
     eventSink.post(new Vetoed(TASK_ID, vetoes));
 
     control.replay();
 
-    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, EMPTY_JOB));
+    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, emptyJob));
   }
 
   @Test
   public void testNoVetoes() {
     Set<Veto> vetoes = ImmutableSet.of();
-    expect(delegate.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, EMPTY_JOB)).andReturn(vetoes);
+    expect(delegate.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
 
     control.replay();
 
-    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, EMPTY_JOB));
+    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, emptyJob));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
new file mode 100644
index 0000000..b3531b1
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
@@ -0,0 +1,177 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.filter;
+
+import java.util.Map;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.collections.Pair;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.easymock.IExpectationSetters;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class AttributeAggregateTest extends EasyMockTest {
+
+  private Supplier<ImmutableSet<IScheduledTask>> activeTaskSupplier;
+  private AttributeStore attributeStore;
+  private AttributeAggregate aggregate;
+
+  @Before
+  public void setUp() throws Exception {
+    activeTaskSupplier = createMock(new Clazz<Supplier<ImmutableSet<IScheduledTask>>>() { });
+    attributeStore = createMock(AttributeStore.class);
+    aggregate = new AttributeAggregate(activeTaskSupplier, attributeStore);
+  }
+
+  @Test
+  public void testNoTasks() {
+    expectGetTasks();
+
+    control.replay();
+
+    assertAggregates(ImmutableMap.<Pair<String, String>, Long>of());
+    assertAggregate("none", "alsoNone", 0);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testAttributesMissing() {
+    expectGetTasks(task("1", "a"));
+    expect(attributeStore.getHostAttributes("a")).andReturn(Optional.<HostAttributes>absent());
+
+    control.replay();
+
+    aggregate.getAggregates();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testTaskWithNoHost() {
+    expectGetTasks(task("1", null));
+
+    control.replay();
+
+    aggregate.getAggregates();
+  }
+
+  @Test
+  public void testNoAttributes() {
+    expectGetTasks(task("1", "hostA"));
+    expectGetAttributes("hostA");
+
+    control.replay();
+
+    assertAggregates(ImmutableMap.<Pair<String, String>, Long>of());
+  }
+
+  @Test
+  public void testAggregate() {
+    expectGetTasks(
+        task("1", "a1"),
+        task("2", "b1"),
+        task("3", "b1"),
+        task("4", "b2"),
+        task("5", "c1"));
+    expectGetAttributes(
+        "a1",
+        attribute("host", "a1"),
+        attribute("rack", "a"),
+        attribute("pdu", "p1"));
+    expectGetAttributes(
+        "b1",
+        attribute("host", "b1"),
+        attribute("rack", "b"),
+        attribute("pdu", "p1", "p2"))
+        .times(2);
+    expectGetAttributes(
+        "b2",
+        attribute("host", "b2"),
+        attribute("rack", "b"),
+        attribute("pdu", "p1", "p2"));
+    expectGetAttributes(
+        "c1",
+        attribute("host", "c1"),
+        attribute("rack", "c"),
+        attribute("pdu", "p2"),
+        attribute("ssd", "true"));
+
+    control.replay();
+
+    Map<Pair<String, String>, Long> expected = ImmutableMap.<Pair<String, String>, Long>builder()
+        .put(Pair.of("rack", "a"), 1L)
+        .put(Pair.of("rack", "b"), 3L)
+        .put(Pair.of("rack", "c"), 1L)
+        .put(Pair.of("host", "a1"), 1L)
+        .put(Pair.of("host", "b1"), 2L)
+        .put(Pair.of("host", "b2"), 1L)
+        .put(Pair.of("host", "c1"), 1L)
+        .put(Pair.of("pdu", "p1"), 4L)
+        .put(Pair.of("pdu", "p2"), 4L)
+        .put(Pair.of("ssd", "true"), 1L)
+        .build();
+    assertAggregates(expected);
+    for (Map.Entry<Pair<String, String>, Long> entry : expected.entrySet()) {
+      assertAggregate(entry.getKey().getFirst(), entry.getKey().getSecond(), entry.getValue());
+    }
+    assertAggregate("host", "c2", 0L);
+    assertAggregate("hostc", "2", 0L);
+  }
+
+  private void expectGetTasks(IScheduledTask... activeTasks) {
+    expect(activeTaskSupplier.get())
+        .andReturn(ImmutableSet.<IScheduledTask>builder().add(activeTasks).build());
+  }
+
+  private IExpectationSetters<?> expectGetAttributes(String host, Attribute... attributes) {
+    return expect(attributeStore.getHostAttributes(host)).andReturn(Optional.of(
+        new HostAttributes()
+            .setHost(host)
+            .setAttributes(ImmutableSet.<Attribute>builder().add(attributes).build())));
+  }
+
+  private void assertAggregates(Map<Pair<String, String>, Long> expected) {
+    assertEquals(expected, aggregate.getAggregates());
+  }
+
+  private void assertAggregate(String name, String value, long expected) {
+    assertEquals(expected, aggregate.getNumTasksWithAttribute(name, value));
+  }
+
+  private static IScheduledTask task(String id, String host) {
+    return IScheduledTask.build(new ScheduledTask().setAssignedTask(
+        new AssignedTask()
+            .setTaskId(id)
+            .setSlaveHost(host)));
+  }
+
+  private Attribute attribute(String name, String... values) {
+    return new Attribute()
+        .setName(name)
+        .setValues(ImmutableSet.<String>builder().add(values).build());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index 56de3aa..79d14c9 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -101,8 +101,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
       Amount.of(DEFAULT_DISK, Data.MB),
       0);
 
-  private static final CachedJobState EMPTY_JOB =
-      new CachedJobState(Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()));
+  private AttributeAggregate emptyJob;
 
   private final AtomicLong taskIdCounter = new AtomicLong();
 
@@ -119,6 +118,9 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     defaultFilter = new SchedulingFilterImpl(storage, maintenance);
     storeProvider = createMock(StoreProvider.class);
     attributeStore = createMock(AttributeStore.Mutable.class);
+    emptyJob = new AttributeAggregate(
+        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
+        attributeStore);
 
     // Link the store provider to the store mocks.
     expectReads();
@@ -145,7 +147,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     control.replay();
 
     assertNoVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK));
-    assertNoVetoes(makeTask(DEFAULT_CPUS - 1, DEFAULT_RAM - 1, DEFAULT_DISK - 1), EMPTY_JOB);
+    assertNoVetoes(makeTask(DEFAULT_CPUS - 1, DEFAULT_RAM - 1, DEFAULT_DISK - 1), emptyJob);
   }
 
   @Test
@@ -175,12 +177,12 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         .setRequestedPorts(ImmutableSet.of("one", "two", "three")));
 
     Set<Veto> none = ImmutableSet.of();
-    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, noPortTask, TASK_ID, EMPTY_JOB));
-    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, onePortTask, TASK_ID, EMPTY_JOB));
-    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, twoPortTask, TASK_ID, EMPTY_JOB));
+    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, noPortTask, TASK_ID, emptyJob));
+    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, onePortTask, TASK_ID, emptyJob));
+    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, twoPortTask, TASK_ID, emptyJob));
     assertEquals(
         ImmutableSet.of(PORTS.veto(1)),
-        defaultFilter.filter(twoPorts, HOST_A, threePortTask, TASK_ID, EMPTY_JOB));
+        defaultFilter.filter(twoPorts, HOST_A, threePortTask, TASK_ID, emptyJob));
   }
 
   @Test
@@ -355,15 +357,17 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     expectGetHostMaintenanceStatus(HOST_B).atLeastOnce();
     expectGetHostMaintenanceStatus(HOST_C).atLeastOnce();
 
-    CachedJobState stateA = new CachedJobState(Suppliers.ofInstance(ImmutableSet.of(
+    AttributeAggregate stateA = new AttributeAggregate(Suppliers.ofInstance(ImmutableSet.of(
         makeScheduledTask(OWNER_A, JOB_A, HOST_A),
         makeScheduledTask(OWNER_A, JOB_A, HOST_B),
         makeScheduledTask(OWNER_A, JOB_A, HOST_B),
-        makeScheduledTask(OWNER_A, JOB_A, HOST_C))));
-    CachedJobState stateB = new CachedJobState(Suppliers.ofInstance(ImmutableSet.of(
+        makeScheduledTask(OWNER_A, JOB_A, HOST_C))),
+        attributeStore);
+    AttributeAggregate stateB = new AttributeAggregate(Suppliers.ofInstance(ImmutableSet.of(
         makeScheduledTask(OWNER_B, JOB_A, HOST_A),
         makeScheduledTask(OWNER_B, JOB_A, HOST_A),
-        makeScheduledTask(OWNER_B, JOB_A, HOST_B))));
+        makeScheduledTask(OWNER_B, JOB_A, HOST_B))),
+        attributeStore);
 
     control.replay();
 
@@ -434,7 +438,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     Constraint zoneConstraint = makeConstraint("zone", "c");
 
     ITaskConfig task = makeTask(OWNER_A, JOB_A, jvmConstraint, zoneConstraint);
-    assertTrue(defaultFilter.filter(DEFAULT_OFFER, HOST_A, task, TASK_ID, EMPTY_JOB).isEmpty());
+    assertTrue(defaultFilter.filter(DEFAULT_OFFER, HOST_A, task, TASK_ID, emptyJob).isEmpty());
 
     Constraint jvmNegated = jvmConstraint.deepCopy();
     jvmNegated.getConstraint().getValue().setNegated(true);
@@ -482,7 +486,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
       String value,
       String... vs) {
 
-    return checkConstraint(owner, jobName, EMPTY_JOB, host, constraintName, expected,
+    return checkConstraint(owner, jobName, emptyJob, host, constraintName, expected,
         new ValueConstraint(false,
             ImmutableSet.<String>builder().add(value).addAll(Arrays.asList(vs)).build()));
   }
@@ -490,7 +494,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   private ITaskConfig checkConstraint(
       Identity owner,
       String jobName,
-      CachedJobState cachedJobState,
+      AttributeAggregate aggregate,
       String host,
       String constraintName,
       boolean expected,
@@ -500,38 +504,38 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     ITaskConfig task = makeTask(owner, jobName, constraint);
     assertEquals(
         expected,
-        defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID, cachedJobState).isEmpty());
+        defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID, aggregate).isEmpty());
 
     Constraint negated = constraint.deepCopy();
     negated.getConstraint().getValue().setNegated(!value.isNegated());
     ITaskConfig negatedTask = makeTask(owner, jobName, negated);
     assertEquals(
         !expected,
-        defaultFilter.filter(DEFAULT_OFFER, host, negatedTask, TASK_ID, cachedJobState).isEmpty());
+        defaultFilter.filter(DEFAULT_OFFER, host, negatedTask, TASK_ID, aggregate).isEmpty());
     return task;
   }
 
   private void assertNoVetoes(ITaskConfig task) {
-    assertNoVetoes(task, EMPTY_JOB);
+    assertNoVetoes(task, emptyJob);
   }
 
-  private void assertNoVetoes(ITaskConfig task, CachedJobState jobState) {
+  private void assertNoVetoes(ITaskConfig task, AttributeAggregate jobState) {
     assertNoVetoes(task, HOST_A, jobState);
   }
 
   private void assertNoVetoes(ITaskConfig task, String host) {
-    assertVetoes(task, host, EMPTY_JOB);
+    assertVetoes(task, host, emptyJob);
   }
 
-  private void assertNoVetoes(ITaskConfig task, String host, CachedJobState jobState) {
+  private void assertNoVetoes(ITaskConfig task, String host, AttributeAggregate jobState) {
     assertVetoes(task, host, jobState);
   }
 
   private void assertVetoes(ITaskConfig task, Veto... vetos) {
-    assertVetoes(task, EMPTY_JOB, vetos);
+    assertVetoes(task, emptyJob, vetos);
   }
 
-  private void assertVetoes(ITaskConfig task, CachedJobState jobState, Veto... vetos) {
+  private void assertVetoes(ITaskConfig task, AttributeAggregate jobState, Veto... vetos) {
     assertVetoes(task, HOST_A, jobState, vetos);
   }
 
@@ -540,13 +544,13 @@ public class SchedulingFilterImplTest extends EasyMockTest {
       String host,
       Veto... vetoes) {
 
-    assertVetoes(task, host, EMPTY_JOB, vetoes);
+    assertVetoes(task, host, emptyJob, vetoes);
   }
 
   private void assertVetoes(
       ITaskConfig task,
       String host,
-      CachedJobState jobState,
+      AttributeAggregate jobState,
       Veto... vetoes) {
 
     assertEquals(

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cdfac3fc/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index f0fdd57..b4aeaf2 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -27,10 +27,11 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.MesosTaskFactory;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.CachedJobState;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
+import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos.FrameworkID;
 import org.apache.mesos.Protos.Offer;
@@ -75,13 +76,11 @@ public class TaskAssignerImplTest extends EasyMockTest {
       .setSlaveId(OFFER.getSlaveId())
       .build();
 
-  private static final CachedJobState EMPTY_JOB =
-      new CachedJobState(Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()));
-
   private StateManager stateManager;
   private SchedulingFilter filter;
   private MesosTaskFactory taskFactory;
   private TaskAssigner assigner;
+  private AttributeAggregate emptyJob;
 
   @Before
   public void setUp() throws Exception {
@@ -89,6 +88,9 @@ public class TaskAssignerImplTest extends EasyMockTest {
     filter = createMock(SchedulingFilter.class);
     taskFactory = createMock(MesosTaskFactory.class);
     assigner = new TaskAssignerImpl(stateManager, filter, taskFactory);
+    emptyJob = new AttributeAggregate(
+        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
+        createMock(AttributeStore.class));
   }
 
   @Test
@@ -98,7 +100,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         OFFER.getHostname(),
         TASK.getAssignedTask().getTask(),
         Tasks.id(TASK),
-        EMPTY_JOB))
+        emptyJob))
         .andReturn(ImmutableSet.<Veto>of());
     expect(stateManager.assignTask(
         Tasks.id(TASK),
@@ -110,7 +112,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertEquals(Optional.of(TASK_INFO), assigner.maybeAssign(OFFER, TASK, EMPTY_JOB));
+    assertEquals(Optional.of(TASK_INFO), assigner.maybeAssign(OFFER, TASK, emptyJob));
   }
 
 
@@ -121,11 +123,11 @@ public class TaskAssignerImplTest extends EasyMockTest {
         OFFER.getHostname(),
         TASK.getAssignedTask().getTask(),
         Tasks.id(TASK),
-        EMPTY_JOB))
+        emptyJob))
         .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
 
     control.replay();
 
-    assertEquals(Optional.<TaskInfo>absent(), assigner.maybeAssign(OFFER, TASK, EMPTY_JOB));
+    assertEquals(Optional.<TaskInfo>absent(), assigner.maybeAssign(OFFER, TASK, emptyJob));
   }
 }


Mime
View raw message