aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [2/2] incubator-aurora git commit: Store host attributes alongside offers to reduce number of lookups.
Date Thu, 13 Nov 2014 00:10:28 GMT
Store host attributes alongside offers to reduce number of lookups.

Bugs closed: AURORA-913

Reviewed at https://reviews.apache.org/r/27902/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/b80e69c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/b80e69c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/b80e69c9

Branch: refs/heads/master
Commit: b80e69c9b9d0e18b9ea1189fd310e6995e4e1fe9
Parents: 2e2c4b3
Author: Bill Farner <wfarner@apache.org>
Authored: Wed Nov 12 16:10:03 2014 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Wed Nov 12 16:10:03 2014 -0800

----------------------------------------------------------------------
 .../org/apache/aurora/scheduler/HostOffer.java  |  66 ++++
 .../apache/aurora/scheduler/TaskLauncher.java   |   3 +-
 .../aurora/scheduler/UserTaskLauncher.java      |   3 +-
 .../scheduler/async/GcExecutorLauncher.java     |  18 +-
 .../aurora/scheduler/async/OfferQueue.java      | 125 ++-----
 .../aurora/scheduler/async/Preemptor.java       |  67 ++--
 .../async/RandomJitterReturnDelay.java          |   4 +-
 .../aurora/scheduler/async/TaskScheduler.java   |  10 +-
 .../events/NotifyingSchedulingFilter.java       |   7 +-
 .../scheduler/filter/SchedulingFilter.java      |   7 +-
 .../scheduler/filter/SchedulingFilterImpl.java  |  86 ++---
 .../apache/aurora/scheduler/http/Offers.java    |   4 +-
 .../scheduler/mesos/MesosSchedulerImpl.java     |  44 ++-
 .../scheduler/state/MaintenanceController.java  |   1 +
 .../aurora/scheduler/state/TaskAssigner.java    |  19 +-
 .../scheduler/stats/AsyncStatsModule.java       |   9 +-
 .../aurora/scheduler/UserTaskLauncherTest.java  |  13 +-
 .../scheduler/async/GcExecutorLauncherTest.java |  24 +-
 .../scheduler/async/OfferQueueImplTest.java     | 136 +++-----
 .../scheduler/async/PreemptorImplTest.java      |  53 +--
 .../scheduler/async/TaskSchedulerImplTest.java  |   9 +-
 .../scheduler/async/TaskSchedulerTest.java      | 107 +++---
 .../events/NotifyingSchedulingFilterTest.java   |  14 +-
 .../filter/SchedulingFilterImplTest.java        | 335 +++++++++----------
 .../scheduler/mesos/MesosSchedulerImplTest.java |  64 ++--
 .../scheduler/state/TaskAssignerImplTest.java   |  35 +-
 26 files changed, 625 insertions(+), 638 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/HostOffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/HostOffer.java b/src/main/java/org/apache/aurora/scheduler/HostOffer.java
new file mode 100644
index 0000000..5056b60
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/HostOffer.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.Objects;
+
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.mesos.Protos.Offer;
+
+/**
+ * An available resource in the cluster.
+ */
+public class HostOffer {
+  private final Offer offer;
+  private final IHostAttributes hostAttributes;
+
+  public HostOffer(Offer offer, IHostAttributes hostAttributes) {
+    this.offer = requireNonNull(offer);
+    this.hostAttributes = requireNonNull(hostAttributes);
+  }
+
+  public Offer getOffer() {
+    return offer;
+  }
+
+  public IHostAttributes getAttributes() {
+    return hostAttributes;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof HostOffer)) {
+      return false;
+    }
+    HostOffer other = (HostOffer) o;
+    return Objects.equals(offer, other.offer)
+        && Objects.equals(hostAttributes, other.hostAttributes);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(offer, hostAttributes);
+  }
+
+  @Override
+  public String toString() {
+    return com.google.common.base.Objects.toStringHelper(this)
+        .add("offer", offer)
+        .add("hostAttributes", hostAttributes)
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
index c13520a..cd55a6e 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler;
 
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.TaskStatus;
 
@@ -33,7 +32,7 @@ public interface TaskLauncher {
    * @return {@code false} if the launcher will not act on the offer, or {@code true} if the
    *         launcher may accept the offer at some point in the future.
    */
-  boolean willUse(Offer offer);
+  boolean willUse(HostOffer offer);
 
   /**
    * Informs the launcher that a status update has been received for a task.  If the task is not

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
index 250c2df..e1b7d05 100644
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -27,7 +27,6 @@ import org.apache.aurora.scheduler.async.OfferQueue;
 import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.TaskStatus;
 
@@ -56,7 +55,7 @@ class UserTaskLauncher implements TaskLauncher {
   }
 
   @Override
-  public boolean willUse(Offer offer) {
+  public boolean willUse(HostOffer offer) {
     requireNonNull(offer);
 
     offerQueue.addOffer(offer);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
index 79d8d8d..e02921d 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
@@ -41,6 +41,7 @@ import org.apache.aurora.Protobufs;
 import org.apache.aurora.codec.ThriftBinaryCodec;
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.gen.comm.AdjustRetainedTasks;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TaskLauncher;
 import org.apache.aurora.scheduler.base.CommandUtil;
 import org.apache.aurora.scheduler.base.Query;
@@ -52,7 +53,6 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.ExecutorID;
 import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskID;
@@ -192,20 +192,22 @@ public class GcExecutorLauncher implements TaskLauncher {
             Maps.transformValues(Tasks.mapById(tasksOnHost), Tasks.GET_STATUS)));
   }
 
-  private boolean sufficientResources(Offer offer) {
-    boolean sufficient = Resources.from(offer).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES);
+  private boolean sufficientResources(HostOffer offer) {
+    boolean sufficient =
+        Resources.from(offer.getOffer()).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES);
     if (!sufficient) {
-      LOG.warning("Offer for host " + offer.getHostname() + " is too small for a GC executor");
+      LOG.warning("Offer for host " + offer.getOffer().getHostname()
+          + " is too small for a GC executor");
       insufficientOffers.incrementAndGet();
     }
     return sufficient;
   }
 
   @Override
-  public boolean willUse(final Offer offer) {
+  public boolean willUse(final HostOffer offer) {
     if (!settings.getGcExecutorPath().isPresent()
         || !sufficientResources(offer)
-        || !isTimeToCollect(offer.getHostname())) {
+        || !isTimeToCollect(offer.getOffer().getHostname())) {
 
       return false;
     }
@@ -213,7 +215,9 @@ public class GcExecutorLauncher implements TaskLauncher {
     executor.execute(new Runnable() {
       @Override
       public void run() {
-        driver.launchTask(offer.getId(), makeGcTask(offer.getHostname(), offer.getSlaveId()));
+        driver.launchTask(
+            offer.getOffer().getId(),
+            makeGcTask(offer.getOffer().getHostname(), offer.getOffer().getSlaveId()));
       }
     });
     offersConsumed.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
index dd8a900..d2682cd 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async;
 
 import java.util.Comparator;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledExecutorService;
@@ -37,11 +36,11 @@ import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.MaintenanceController;
-import org.apache.mesos.Protos.Offer;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskInfo;
@@ -64,7 +63,7 @@ public interface OfferQueue extends EventSubscriber {
    *
    * @param offer Newly-available resource offer.
    */
-  void addOffer(Offer offer);
+  void addOffer(HostOffer offer);
 
   /**
    * Invalidates an offer.  This indicates that the scheduler should not attempt to match any
@@ -104,7 +103,7 @@ public interface OfferQueue extends EventSubscriber {
    * The delay is calculated for each offer that is received, so the return delay may be
    * fixed or variable.
    */
-  interface OfferReturnDelay extends Supplier<Amount<Integer, Time>> {
+  interface OfferReturnDelay extends Supplier<Amount<Long, Time>> {
   }
 
   /**
@@ -120,51 +119,6 @@ public interface OfferQueue extends EventSubscriber {
     }
   }
 
-  /**
-   * Encapsulate an offer from a host, and the host's maintenance mode.
-   */
-  class HostOffer {
-    private final Offer offer;
-
-    // TODO(wfarner): Replace this with HostAttributes for more use of this caching.
-    private final MaintenanceMode mode;
-
-    public HostOffer(Offer offer, MaintenanceMode mode) {
-      this.offer = requireNonNull(offer);
-      this.mode = requireNonNull(mode);
-    }
-
-    public Offer getOffer() {
-      return offer;
-    }
-
-    public MaintenanceMode getMode() {
-      return mode;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof HostOffer)) {
-        return false;
-      }
-      HostOffer other = (HostOffer) o;
-      return Objects.equals(offer, other.offer) && mode == other.mode;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(offer, mode);
-    }
-
-    @Override
-    public String toString() {
-      return com.google.common.base.Objects.toStringHelper(this)
-          .add("offer", offer)
-          .add("mode", mode)
-          .toString();
-    }
-  }
-
   class OfferQueueImpl implements OfferQueue {
     private static final Logger LOG = Logger.getLogger(OfferQueueImpl.class.getName());
 
@@ -174,41 +128,36 @@ public interface OfferQueue extends EventSubscriber {
     private final Driver driver;
     private final OfferReturnDelay returnDelay;
     private final ScheduledExecutorService executor;
-    private final MaintenanceController maintenance;
 
     @Inject
-    OfferQueueImpl(Driver driver,
-        OfferReturnDelay returnDelay,
-        ScheduledExecutorService executor,
-        MaintenanceController maintenance) {
-
-      this.driver = driver;
-      this.returnDelay = returnDelay;
-      this.executor = executor;
-      this.maintenance = maintenance;
+    OfferQueueImpl(Driver driver, OfferReturnDelay returnDelay, ScheduledExecutorService executor) {
+      this.driver = requireNonNull(driver);
+      this.returnDelay = requireNonNull(returnDelay);
+      this.executor = requireNonNull(executor);
     }
 
     @Override
-    public void addOffer(final Offer offer) {
+    public void addOffer(final HostOffer offer) {
       // We run a slight risk of a race here, which is acceptable.  The worst case is that we
       // temporarily hold two offers for the same host, which should be corrected when we return
       // them after the return delay.
       // There's also a chance that we return an offer for compaction ~simultaneously with the
       // same-host offer being canceled/returned.  This is also fine.
-      Optional<HostOffer> sameSlave = hostOffers.get(offer.getSlaveId());
+      Optional<HostOffer> sameSlave = hostOffers.get(offer.getOffer().getSlaveId());
       if (sameSlave.isPresent()) {
         // If there are existing offers for the slave, decline all of them so the master can
         // compact all of those offers into a single offer and send them back.
-        LOG.info("Returning offers for " + offer.getSlaveId().getValue() + " for compaction.");
-        decline(offer.getId());
-        removeAndDecline(sameSlave.get().offer.getId());
+        LOG.info("Returning offers for " + offer.getOffer().getSlaveId().getValue()
+            + " for compaction.");
+        decline(offer.getOffer().getId());
+        removeAndDecline(sameSlave.get().getOffer().getId());
       } else {
-        hostOffers.add(new HostOffer(offer, maintenance.getMode(offer.getHostname())));
+        hostOffers.add(offer);
         executor.schedule(
             new Runnable() {
               @Override
               public void run() {
-                removeAndDecline(offer.getId());
+                removeAndDecline(offer.getOffer().getId());
               }
             },
             returnDelay.get().as(Time.MILLISECONDS),
@@ -252,7 +201,7 @@ public interface OfferQueue extends EventSubscriber {
      */
     @Subscribe
     public void hostAttributesChanged(HostAttributesChanged change) {
-      hostOffers.updateHostMode(change.getAttributes().getHost(), change.getAttributes().getMode());
+      hostOffers.updateHostAttributes(change.getAttributes());
     }
 
     /**
@@ -280,12 +229,12 @@ public interface OfferQueue extends EventSubscriber {
               .onResultOf(new Function<HostOffer, MaintenanceMode>() {
                 @Override
                 public MaintenanceMode apply(HostOffer offer) {
-                  return offer.mode;
+                  return offer.getAttributes().getMode();
                 }
               })
               .compound(Ordering.arbitrary());
 
-      private final Set<HostOffer> hostOffers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR);
+      private final Set<HostOffer> offers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR);
       private final Map<OfferID, HostOffer> offersById = Maps.newHashMap();
       private final Map<SlaveID, HostOffer> offersBySlave = Maps.newHashMap();
       private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
@@ -293,7 +242,7 @@ public interface OfferQueue extends EventSubscriber {
       HostOffers() {
         // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
         // Could track this separately if it turns out to pose problems.
-        Stats.exportSize("outstanding_offers", hostOffers);
+        Stats.exportSize("outstanding_offers", offers);
       }
 
       synchronized Optional<HostOffer> get(SlaveID slaveId) {
@@ -301,37 +250,37 @@ public interface OfferQueue extends EventSubscriber {
       }
 
       synchronized void add(HostOffer offer) {
-        hostOffers.add(offer);
-        offersById.put(offer.offer.getId(), offer);
-        offersBySlave.put(offer.offer.getSlaveId(), offer);
-        offersByHost.put(offer.offer.getHostname(), offer);
+        offers.add(offer);
+        offersById.put(offer.getOffer().getId(), offer);
+        offersBySlave.put(offer.getOffer().getSlaveId(), offer);
+        offersByHost.put(offer.getOffer().getHostname(), offer);
       }
 
       synchronized boolean remove(OfferID id) {
         HostOffer removed = offersById.remove(id);
         if (removed != null) {
-          hostOffers.remove(removed);
-          offersBySlave.remove(removed.offer.getSlaveId());
-          offersByHost.remove(removed.offer.getHostname());
+          offers.remove(removed);
+          offersBySlave.remove(removed.getOffer().getSlaveId());
+          offersByHost.remove(removed.getOffer().getHostname());
         }
         return removed != null;
       }
 
-      synchronized void updateHostMode(String hostName, MaintenanceMode mode) {
-        HostOffer offer = offersByHost.remove(hostName);
+      synchronized void updateHostAttributes(IHostAttributes attributes) {
+        HostOffer offer = offersByHost.remove(attributes.getHost());
         if (offer != null) {
           // Remove and re-add a host's offer to re-sort based on its new hostStatus
-          remove(offer.offer.getId());
-          add(new HostOffer(offer.offer, mode));
+          remove(offer.getOffer().getId());
+          add(new HostOffer(offer.getOffer(),  attributes));
         }
       }
 
       synchronized Iterable<HostOffer> getWeaklyConsistentOffers() {
-        return Iterables.unmodifiableIterable(hostOffers);
+        return Iterables.unmodifiableIterable(offers);
       }
 
       synchronized void clear() {
-        hostOffers.clear();
+        offers.clear();
         offersById.clear();
         offersBySlave.clear();
         offersByHost.clear();
@@ -345,17 +294,17 @@ public interface OfferQueue extends EventSubscriber {
       // It's important that this method is not called concurrently - doing so would open up the
       // possibility of a race between the same offers being accepted by different threads.
 
-      for (HostOffer hostOffer : hostOffers.getWeaklyConsistentOffers()) {
-        Optional<TaskInfo> assignment = acceptor.apply(hostOffer);
+      for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) {
+        Optional<TaskInfo> assignment = acceptor.apply(offer);
         if (assignment.isPresent()) {
           // Guard against an offer being removed after we grabbed it from the iterator.
           // If that happens, the offer will not exist in hostOffers, and we can immediately
           // send it back to LOST for quick reschedule.
           // Removing while iterating counts on the use of a weakly-consistent iterator being used,
           // which is a feature of ConcurrentSkipListSet.
-          if (hostOffers.remove(hostOffer.offer.getId())) {
+          if (hostOffers.remove(offer.getOffer().getId())) {
             try {
-              driver.launchTask(hostOffer.offer.getId(), assignment.get());
+              driver.launchTask(offer.getOffer().getId(), assignment.get());
               return true;
             } catch (IllegalStateException e) {
               // TODO(William Farner): Catch only the checked exception produced by Driver

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index a17738e..1d337f6 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -40,19 +40,20 @@ import com.google.common.collect.Sets;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.Stats;
+import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Clock;
 
-import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
 import static java.lang.annotation.ElementType.FIELD;
@@ -63,8 +64,9 @@ import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
+import static org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import static org.apache.aurora.scheduler.storage.Storage.Work;
 
 /**
  * Preempts active tasks in favor of higher priority tasks.
@@ -130,7 +132,7 @@ public interface Preemptor {
     private final SchedulingFilter schedulingFilter;
     private final Amount<Long, Time> preemptionCandidacyDelay;
     private final Clock clock;
-    private final MaintenanceController maintenance;
+    private final AtomicLong missingAttributes;
 
     /**
      * Creates a new preemptor.
@@ -151,7 +153,7 @@ public interface Preemptor {
         SchedulingFilter schedulingFilter,
         @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
         Clock clock,
-        MaintenanceController maintenance) {
+        StatsProvider statsProvider) {
 
       this.storage = requireNonNull(storage);
       this.stateManager = requireNonNull(stateManager);
@@ -159,7 +161,7 @@ public interface Preemptor {
       this.schedulingFilter = requireNonNull(schedulingFilter);
       this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
       this.clock = requireNonNull(clock);
-      this.maintenance = requireNonNull(maintenance);
+      missingAttributes = statsProvider.makeCounter("preemptor_missing_attributes");
     }
 
     private List<IAssignedTask> fetch(Query.Builder query, Predicate<IScheduledTask> filter) {
@@ -200,24 +202,24 @@ public interface Preemptor {
     private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
         new Function<HostOffer, ResourceSlot>() {
           @Override
-          public ResourceSlot apply(HostOffer hostOffer) {
-            return ResourceSlot.from(hostOffer.getOffer());
+          public ResourceSlot apply(HostOffer offer) {
+            return ResourceSlot.from(offer.getOffer());
           }
         };
 
     private static final Function<HostOffer, String> OFFER_TO_HOST =
         new Function<HostOffer, String>() {
           @Override
-          public String apply(HostOffer hostOffer) {
-            return hostOffer.getOffer().getHostname();
+          public String apply(HostOffer offer) {
+            return offer.getOffer().getHostname();
           }
         };
 
-    private static final Function<HostOffer, MaintenanceMode> OFFER_TO_MODE =
-        new Function<HostOffer, MaintenanceMode>() {
+    private static final Function<HostOffer, IHostAttributes> OFFER_TO_ATTRIBUTES =
+        new Function<HostOffer, IHostAttributes>() {
           @Override
-          public MaintenanceMode apply(HostOffer hostOffer) {
-            return hostOffer.getMode();
+          public IHostAttributes apply(HostOffer offer) {
+            return offer.getAttributes();
           }
         };
 
@@ -255,18 +257,17 @@ public interface Preemptor {
           // us.
           return Optional.absent();
         }
-        MaintenanceMode mode =
-            Iterables.getOnlyElement(FluentIterable.from(offers).transform(OFFER_TO_MODE).toSet());
+        IHostAttributes attributes = Iterables.getOnlyElement(
+            FluentIterable.from(offers).transform(OFFER_TO_ATTRIBUTES).toSet());
 
-        Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
+        Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
             slackResources,
-            host,
-            mode,
+            attributes,
             pendingTask.getTask(),
             pendingTask.getTaskId(),
             attributeAggregate);
 
-        if (vetos.isEmpty()) {
+        if (vetoes.isEmpty()) {
           return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
         }
       }
@@ -289,26 +290,40 @@ public interface Preemptor {
             ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)),
             slackResources);
 
-        Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
+        Optional<IHostAttributes> attributes = getHostAttributes(host);
+        if (!attributes.isPresent()) {
+          missingAttributes.incrementAndGet();
+          continue;
+        }
+
+        Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
             totalResource,
-            host,
-            maintenance.getMode(host),
+            attributes.get(),
             pendingTask.getTask(),
             pendingTask.getTaskId(),
             attributeAggregate);
 
-        if (vetos.isEmpty()) {
+        if (vetoes.isEmpty()) {
           return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
         }
       }
       return Optional.absent();
     }
 
+    private Optional<IHostAttributes> getHostAttributes(final String host) {
+      return storage.weaklyConsistentRead(new Work.Quiet<Optional<IHostAttributes>>() {
+        @Override
+        public Optional<IHostAttributes> apply(StoreProvider storeProvider) {
+          return storeProvider.getAttributeStore().getHostAttributes(host);
+        }
+      });
+    }
+
     private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
         new Function<HostOffer, String>() {
           @Override
-          public String apply(HostOffer hostOffer) {
-            return hostOffer.getOffer().getSlaveId().getValue();
+          public String apply(HostOffer offer) {
+            return offer.getOffer().getSlaveId().getValue();
           }
         };
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java b/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
index d15d9e6..2accb4e 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
@@ -43,7 +43,7 @@ class RandomJitterReturnDelay implements OfferReturnDelay {
   }
 
   @Override
-  public Amount<Integer, Time> get() {
-    return Amount.of(minHoldTimeMs + random.nextInt(maxJitterWindowMs), Time.MILLISECONDS);
+  public Amount<Long, Time> get() {
+    return Amount.of((long) minHoldTimeMs + random.nextInt(maxJitterWindowMs), Time.MILLISECONDS);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index b23457e..e2ba8b8 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -41,6 +41,7 @@ import com.twitter.common.stats.Stats;
 import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Clock;
 
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -65,7 +66,6 @@ import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 
 /**
  * Enables scheduling and preemption of tasks.
@@ -134,20 +134,20 @@ public interface TaskScheduler extends EventSubscriber {
 
       return new Function<HostOffer, Optional<TaskInfo>>() {
         @Override
-        public Optional<TaskInfo> apply(HostOffer hostOffer) {
+        public Optional<TaskInfo> apply(HostOffer offer) {
           Optional<String> reservedTaskId =
-              reservations.getSlaveReservation(hostOffer.getOffer().getSlaveId());
+              reservations.getSlaveReservation(offer.getOffer().getSlaveId());
           if (reservedTaskId.isPresent()) {
             if (taskId.equals(reservedTaskId.get())) {
               // Slave is reserved to satisfy this task.
-              return assigner.maybeAssign(hostOffer, task, attributeAggregate);
+              return assigner.maybeAssign(offer, task, attributeAggregate);
             } else {
               // Slave is reserved for another task.
               return Optional.absent();
             }
           } else {
             // Slave is not reserved.
-            return assigner.maybeAssign(hostOffer, task, attributeAggregate);
+            return assigner.maybeAssign(offer, task, attributeAggregate);
           }
         }
       };

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index fc17cac..ca53303 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -20,11 +20,11 @@ import java.util.Set;
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
-import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static java.lang.annotation.ElementType.FIELD;
@@ -60,13 +60,12 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
   @Override
   public Set<Veto> filter(
       ResourceSlot offer,
-      String slaveHost,
-      MaintenanceMode mode,
+      IHostAttributes hostAttributes,
       ITaskConfig task,
       String taskId,
       AttributeAggregate jobState) {
 
-    Set<Veto> vetoes = delegate.filter(offer, slaveHost, mode, task, taskId, jobState);
+    Set<Veto> vetoes = delegate.filter(offer, hostAttributes, task, taskId, jobState);
     if (!vetoes.isEmpty()) {
       eventSink.post(new Vetoed(taskId, vetoes));
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index c37272c..c1c5f26 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -18,8 +18,8 @@ import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 /**
@@ -105,8 +105,6 @@ public interface SchedulingFilter {
    * Applies a task against the filter with the given resources, and on the host.
    *
    * @param offer Resources offered.
-   * @param slaveHost Host that the resources are associated with.
-   * @param mode Maintenance mode of the host that the resources are associated with.
    * @param task Task.
    * @param taskId Canonical ID of the task.
    * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
@@ -115,8 +113,7 @@ public interface SchedulingFilter {
    */
   Set<Veto> filter(
       ResourceSlot offer,
-      String slaveHost,
-      MaintenanceMode mode,
+      IHostAttributes attributes,
       ITaskConfig task,
       String taskId,
       AttributeAggregate attributeAggregate);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index 0533baa..cc6b53b 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -17,8 +17,6 @@ import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.Set;
 
-import javax.inject.Inject;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
@@ -33,16 +31,10 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
-import org.apache.aurora.scheduler.storage.entities.IAttribute;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
-import static java.util.Objects.requireNonNull;
-
 import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
 import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
@@ -65,18 +57,6 @@ public class SchedulingFilterImpl implements SchedulingFilter {
 
   private static final Set<MaintenanceMode> VETO_MODES = EnumSet.of(DRAINING, DRAINED);
 
-  private final Storage storage;
-
-  /**
-   * Creates a new scheduling filter.
-   *
-   * @param storage Interface to accessing the task store.
-   */
-  @Inject
-  public SchedulingFilterImpl(Storage storage) {
-    this.storage = requireNonNull(storage);
-  }
-
   /**
    * A function that may veto a task.
    */
@@ -187,7 +167,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
 
   private FilterRule getConstraintFilter(
       final AttributeAggregate jobState,
-      final String slaveHost) {
+      final IHostAttributes offerAttributes) {
 
     return new FilterRule() {
       @Override
@@ -196,32 +176,23 @@ public class SchedulingFilterImpl implements SchedulingFilter {
           return ImmutableList.of();
         }
 
-        // In the interest of performance, we perform a weakly consistent read here.  The biggest
-        // risk of this is that we might schedule against stale host attributes, or we might fail
-        // to correctly satisfy a diversity constraint.  Given that the likelihood is relatively low
-        // for both of these, and the impact is also low, the weak consistency is acceptable.
-        return storage.weaklyConsistentRead(new Quiet<Iterable<Veto>>() {
-          @Override
-          public Iterable<Veto> apply(final StoreProvider storeProvider) {
-            ConstraintFilter constraintFilter = new ConstraintFilter(
-                jobState,
-                AttributeStore.Util.attributesOrNone(storeProvider, slaveHost));
-            ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
-            for (IConstraint constraint : VALUES_FIRST.sortedCopy(task.getConstraints())) {
-              Optional<Veto> veto = constraintFilter.getVeto(constraint);
-              if (veto.isPresent()) {
-                vetoes.add(veto.get());
-                if (isValueConstraint(constraint)) {
-                  // Break when a value constraint mismatch is found to avoid other
-                  // potentially-expensive operations to satisfy other constraints.
-                  break;
-                }
-              }
+        ConstraintFilter constraintFilter = new ConstraintFilter(
+            jobState,
+            offerAttributes.getAttributes());
+        ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
+        for (IConstraint constraint : VALUES_FIRST.sortedCopy(task.getConstraints())) {
+          Optional<Veto> veto = constraintFilter.getVeto(constraint);
+          if (veto.isPresent()) {
+            vetoes.add(veto.get());
+            if (isValueConstraint(constraint)) {
+              // Break when a value constraint mismatch is found to avoid other
+              // potentially-expensive operations to satisfy other constraints.
+              break;
             }
-
-            return vetoes.build();
           }
-        });
+        }
+
+        return vetoes.build();
       }
     };
   }
@@ -240,38 +211,31 @@ public class SchedulingFilterImpl implements SchedulingFilter {
     return builder.build();
   }
 
-  private boolean isDedicated(final String slaveHost) {
-    Iterable<IAttribute> slaveAttributes =
-        storage.weaklyConsistentRead(new Quiet<Iterable<IAttribute>>() {
-          @Override
-          public Iterable<IAttribute> apply(final StoreProvider storeProvider) {
-            return AttributeStore.Util.attributesOrNone(storeProvider, slaveHost);
-          }
-        });
-
-    return Iterables.any(slaveAttributes, new ConstraintFilter.NameFilter(DEDICATED_ATTRIBUTE));
+  private boolean isDedicated(IHostAttributes attributes) {
+    return Iterables.any(
+        attributes.getAttributes(),
+        new ConstraintFilter.NameFilter(DEDICATED_ATTRIBUTE));
   }
 
   @Override
   public Set<Veto> filter(
       ResourceSlot offer,
-      String slaveHost,
-      MaintenanceMode mode,
+      IHostAttributes attributes,
       ITaskConfig task,
       String taskId,
       AttributeAggregate attributeAggregate) {
 
-    if (!ConfigurationManager.isDedicated(task) && isDedicated(slaveHost)) {
+    if (!ConfigurationManager.isDedicated(task) && isDedicated(attributes)) {
       return ImmutableSet.of(DEDICATED_HOST_VETO);
     }
 
-    Optional<Veto> maintenanceVeto = getMaintenanceVeto(mode);
+    Optional<Veto> maintenanceVeto = getMaintenanceVeto(attributes.getMode());
     if (maintenanceVeto.isPresent()) {
       return maintenanceVeto.asSet();
     }
 
     return ImmutableSet.<Veto>builder()
-        .addAll(getConstraintFilter(attributeAggregate, slaveHost).apply(task))
+        .addAll(getConstraintFilter(attributeAggregate, attributes).apply(task))
         .addAll(getResourceVetoes(offer, task))
         .build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/http/Offers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
index 446dc74..6d75c3a 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
@@ -27,14 +27,14 @@ import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.OfferQueue;
 import org.apache.mesos.Protos.Attribute;
 import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.Value.Range;
 
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+import static org.apache.mesos.Protos.Offer;
 
 /**
  * Servlet that exposes resource offers that the scheduler is currently retaining.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
index ffcbc97..ffc30bb 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
@@ -30,6 +30,7 @@ import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.GuiceUtils.AllowUnchecked;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TaskLauncher;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
@@ -39,10 +40,10 @@ import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.mesos.Protos.ExecutorID;
 import org.apache.mesos.Protos.FrameworkID;
 import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskStatus;
@@ -55,6 +56,8 @@ import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.mesos.Protos.Offer;
+
 /**
  * Location for communication with mesos.
  */
@@ -90,7 +93,7 @@ class MesosSchedulerImpl implements Scheduler {
    * @param storage Store to save host attributes into.
    * @param lifecycle Application lifecycle manager.
    * @param taskLaunchers Task launchers, which will be used in order.  Calls to
-   *                      {@link TaskLauncher#willUse(Offer)} and
+   *                      {@link TaskLauncher#willUse(HostOffer)} and
    *                      {@link TaskLauncher#statusUpdate(TaskStatus)} are propagated to provided
    *                      launchers, ceasing after the first match (based on a return value of
    *                      {@code true}.
@@ -153,37 +156,30 @@ class MesosSchedulerImpl implements Scheduler {
   public void resourceOffers(SchedulerDriver driver, final List<Offer> offers) {
     Preconditions.checkState(isRegistered, "Must be registered before receiving offers.");
 
-    // Store all host attributes in a single write operation to prevent other threads from
-    // securing the storage lock between saves.  We also save the host attributes before passing
-    // offers elsewhere to ensure that host attributes are available before attempting to
-    // schedule tasks associated with offers.
-    // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
-    //                offers when the host attributes cannot be found. (AURORA-137)
-
     executor.execute(new Runnable() {
       @Override
       public void run() {
+        // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
+        //                offers when the host attributes cannot be found. (AURORA-137)
         storage.write(new MutateWork.NoResult.Quiet() {
           @Override
           protected void execute(MutableStoreProvider storeProvider) {
-            for (final Offer offer : offers) {
-              storeProvider.getAttributeStore().saveHostAttributes(
-                  AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer));
+            for (Offer offer : offers) {
+              IHostAttributes attributes =
+                  AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer);
+              storeProvider.getAttributeStore().saveHostAttributes(attributes);
+              if (LOG.isLoggable(Level.FINE)) {
+                LOG.log(Level.FINE, String.format("Received offer: %s", offer));
+              }
+              totalResourceOffers.incrementAndGet();
+              for (TaskLauncher launcher : taskLaunchers) {
+                if (launcher.willUse(new HostOffer(offer, attributes))) {
+                  break;
+                }
+              }
             }
           }
         });
-
-        for (Offer offer : offers) {
-          if (LOG.isLoggable(Level.FINE)) {
-            LOG.log(Level.FINE, String.format("Received offer: %s", offer));
-          }
-          totalResourceOffers.incrementAndGet();
-          for (TaskLauncher launcher : taskLaunchers) {
-            if (launcher.willUse(offer)) {
-              break;
-            }
-          }
-        }
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 077699f..86440eb 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -52,6 +52,7 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
  * All state-changing functions return their results.  Additionally, all state-changing functions
  * will ignore requests to change state of unknown hosts and subsequently omit these hosts from
  * return values.
+ * TODO(wfarner): Convert use of HostStatus in this API to IHostStatus (immutable).
  */
 public interface MaintenanceController {
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 9c9b659..77db411 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -20,6 +20,7 @@ import javax.inject.Inject;
 
 import com.google.common.base.Optional;
 
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
@@ -29,12 +30,11 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.TaskInfo;
 
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+import static org.apache.mesos.Protos.Offer;
 
 /**
  * Responsible for matching a task against an offer.
@@ -45,13 +45,13 @@ public interface TaskAssigner {
    * Tries to match a task against an offer.  If a match is found, the assigner should
    * make the appropriate changes to the task and provide a non-empty result.
    *
-   * @param hostOffer The resource offer.
+   * @param offer The resource offer.
    * @param task The task to match against and optionally assign.
    * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
    * @return Instructions for launching the task if matching and assignment were successful.
    */
   Optional<TaskInfo> maybeAssign(
-      HostOffer hostOffer,
+      HostOffer offer,
       IScheduledTask task,
       AttributeAggregate attributeAggregate);
 
@@ -89,21 +89,20 @@ public interface TaskAssigner {
 
     @Override
     public Optional<TaskInfo> maybeAssign(
-        HostOffer hostOffer,
+        HostOffer offer,
         IScheduledTask task,
         AttributeAggregate attributeAggregate) {
 
       Set<Veto> vetoes = filter.filter(
-          ResourceSlot.from(hostOffer.getOffer()),
-          hostOffer.getOffer().getHostname(),
-          hostOffer.getMode(),
+          ResourceSlot.from(offer.getOffer()),
+          offer.getAttributes(),
           task.getAssignedTask().getTask(),
           Tasks.id(task),
           attributeAggregate);
       if (vetoes.isEmpty()) {
-        return Optional.of(assign(hostOffer.getOffer(), task));
+        return Optional.of(assign(offer.getOffer(), task));
       } else {
-        LOG.fine("Slave " + hostOffer.getOffer().getHostname() + " vetoed task " + Tasks.id(task)
+        LOG.fine("Slave " + offer.getOffer().getHostname() + " vetoed task " + Tasks.id(task)
             + ": " + vetoes);
         return Optional.absent();
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
index 844a38a..1c9904c 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -35,6 +35,7 @@ import com.twitter.common.quantity.Data;
 import com.twitter.common.quantity.Time;
 
 import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.OfferQueue;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.base.Conversions;
@@ -49,8 +50,6 @@ import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
-
 /**
  * Module to configure export of cluster-wide resource allocation and consumption statistics.
  */
@@ -115,13 +114,13 @@ public class AsyncStatsModule extends AbstractModule {
     private static final Function<HostOffer, MachineResource> TO_RESOURCE =
         new Function<HostOffer, MachineResource>() {
           @Override
-          public MachineResource apply(HostOffer hostOffer) {
-            Resources resources = Resources.from(hostOffer.getOffer());
+          public MachineResource apply(HostOffer offer) {
+            Resources resources = Resources.from(offer.getOffer());
             IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate()
                 .setNumCpus(resources.getNumCpus())
                 .setRamMb(resources.getRam().as(Data.MB))
                 .setDiskMb(resources.getDisk().as(Data.MB)));
-            return new MachineResource(quota, Conversions.isDedicated(hostOffer.getOffer()));
+            return new MachineResource(quota, Conversions.isDedicated(offer.getOffer()));
           }
         };
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
index 083a635..4673e80 100644
--- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
@@ -22,14 +22,15 @@ import com.google.common.collect.Iterables;
 import com.twitter.common.collections.Pair;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
+import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.async.OfferQueue;
 import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.mesos.Protos.Attribute;
 import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.SlaveID;
@@ -47,6 +48,7 @@ import org.junit.Test;
 import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.HOST_CONSTRAINT;
+import static org.apache.mesos.Protos.Offer;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertTrue;
 
@@ -60,7 +62,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
   private static final String TASK_ID_A = "task_id_a";
 
   private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("OfferId").build();
-  private static final Offer OFFER = createOffer(SLAVE_ID, SLAVE_HOST_1, 4, 1024, 1024);
+  private static final HostOffer OFFER = createOffer(SLAVE_ID, SLAVE_HOST_1, 4, 1024, 1024);
 
   private OfferQueue offerQueue;
   private StateManager stateManager;
@@ -171,13 +173,13 @@ public class UserTaskLauncherTest extends EasyMockTest {
     launcher.statusUpdate(status);
   }
 
-  private static Offer createOffer(SlaveID slave, String slaveHost, double cpu,
+  private static HostOffer createOffer(SlaveID slave, String slaveHost, double cpu,
       double ramMb, double diskMb) {
     return createOffer(slave, slaveHost, cpu, ramMb, diskMb,
         ImmutableSet.<Pair<Integer, Integer>>of());
   }
 
-  private static Offer createOffer(SlaveID slave, String slaveHost, double cpu,
+  private static HostOffer createOffer(SlaveID slave, String slaveHost, double cpu,
       double ramMb, double diskMb, Set<Pair<Integer, Integer>> ports) {
 
     Ranges portRanges = Ranges.newBuilder()
@@ -189,7 +191,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
         }))
         .build();
 
-    return Offer.newBuilder()
+    Offer mesosOffer = Offer.newBuilder()
         .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(Resources.CPUS)
             .setScalar(Scalar.newBuilder().setValue(cpu)))
         .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(Resources.RAM_MB)
@@ -206,5 +208,6 @@ public class UserTaskLauncherTest extends EasyMockTest {
         .setFrameworkId(FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build())
         .setId(OFFER_ID)
         .build();
+    return new HostOffer(mesosOffer, IHostAttributes.build(new HostAttributes()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
index 758a8d4..422d5a9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
@@ -30,20 +30,22 @@ import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.comm.AdjustRetainedTasks;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.SlaveID;
@@ -58,6 +60,7 @@ import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.scheduler.async.GcExecutorLauncher.INSUFFICIENT_OFFERS_STAT_NAME;
 import static org.apache.aurora.scheduler.async.GcExecutorLauncher.LOST_TASKS_STAT_NAME;
 import static org.apache.aurora.scheduler.async.GcExecutorLauncher.SYSTEM_TASK_PREFIX;
+import static org.apache.mesos.Protos.Offer;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -67,13 +70,15 @@ public class GcExecutorLauncherTest extends EasyMockTest {
 
   private static final String HOST = "slave-host";
 
-  private static final Offer OFFER = Offer.newBuilder()
+  private static final Offer MESOS_OFFER = Offer.newBuilder()
       .setSlaveId(SlaveID.newBuilder().setValue("slave-id"))
       .setHostname(HOST)
       .setFrameworkId(FrameworkID.newBuilder().setValue("framework-id").build())
       .setId(OfferID.newBuilder().setValue("offer-id"))
       .addAllResources(GcExecutorLauncher.TOTAL_GC_EXECUTOR_RESOURCES.toResourceList())
       .build();
+  private static final HostOffer OFFER =
+      new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()));
 
   private static final String JOB_A = "jobA";
   private static final String TASK_UUID = "gc";
@@ -166,18 +171,19 @@ public class GcExecutorLauncherTest extends EasyMockTest {
         Resources.subtract(
             GcExecutorLauncher.TOTAL_GC_EXECUTOR_RESOURCES,
             GcExecutorLauncher.EPSILON).toResourceList();
-    Offer smallOffer = OFFER.toBuilder()
+    Offer smallOffer = MESOS_OFFER.toBuilder()
         .clearResources()
         .addAllResources(resources)
         .build();
     assertEquals(0, insufficientOffers.get());
-    assertFalse(gcExecutorLauncher.willUse(smallOffer));
+    assertFalse(gcExecutorLauncher.willUse(
+        new HostOffer(smallOffer, IHostAttributes.build(new HostAttributes()))));
     assertEquals(1, insufficientOffers.get());
   }
 
   private static TaskStatus makeStatus(String taskId) {
     return TaskStatus.newBuilder()
-        .setSlaveId(OFFER.getSlaveId())
+        .setSlaveId(OFFER.getOffer().getSlaveId())
         .setState(TaskState.TASK_RUNNING)
         .setTaskId(TaskID.newBuilder().setValue(taskId))
         .build();
@@ -220,8 +226,12 @@ public class GcExecutorLauncherTest extends EasyMockTest {
         Maps.transformValues(Tasks.mapById(ImmutableSet.copyOf(tasks)), Tasks.GET_STATUS);
     AdjustRetainedTasks message = new AdjustRetainedTasks().setRetainedTasks(statuses);
     TaskInfo task = GcExecutorLauncher.makeGcTask(
-        HOST, OFFER.getSlaveId(), SETTINGS.getGcExecutorPath().get(), TASK_UUID, message);
-    driver.launchTask(OFFER.getId(), task);
+        HOST,
+        OFFER.getOffer().getSlaveId(),
+        SETTINGS.getGcExecutorPath().get(),
+        TASK_UUID,
+        message);
+    driver.launchTask(OFFER.getOffer().getId(), task);
   }
 
   private IScheduledTask makeTask(String jobName, ScheduleStatus status) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
index e2a198a..4cf602a 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
@@ -13,154 +13,124 @@
  */
 package org.apache.aurora.scheduler.async;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
 import com.google.common.testing.TearDown;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
 
+import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.scheduler.async.OfferQueue.LaunchException;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.MaintenanceController;
-import org.apache.mesos.Protos.Offer;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
 import org.apache.mesos.Protos.TaskInfo;
-import org.easymock.IAnswer;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class OfferQueueImplTest extends EasyMockTest {
 
-  private static final Amount<Integer, Time> RETURN_DELAY = Amount.of(1, Time.DAYS);
+  private static final Amount<Long, Time> RETURN_DELAY = Amount.of(1L, Time.DAYS);
   private static final String HOST_A = "HOST_A";
-  private static final Offer OFFER_A = Offers.makeOffer("OFFER_A", HOST_A);
+  private static final HostOffer OFFER_A = new HostOffer(
+      Offers.makeOffer("OFFER_A", HOST_A),
+      IHostAttributes.build(new HostAttributes().setMode(NONE)));
   private static final String HOST_B = "HOST_B";
-  private static final Offer OFFER_B = Offers.makeOffer("OFFER_B", HOST_B);
+  private static final HostOffer OFFER_B = new HostOffer(
+      Offers.makeOffer("OFFER_B", HOST_B),
+      IHostAttributes.build(new HostAttributes().setMode(NONE)));
   private static final String HOST_C = "HOST_C";
-  private static final Offer OFFER_C = Offers.makeOffer("OFFER_C", HOST_C);
+  private static final HostOffer OFFER_C = new HostOffer(
+      Offers.makeOffer("OFFER_C", HOST_C),
+      IHostAttributes.build(new HostAttributes().setMode(NONE)));
 
   private Driver driver;
-  private ScheduledExecutorService executor;
-  private ExecutorService testExecutor;
-  private MaintenanceController maintenanceController;
+  private FakeScheduledExecutor clock;
   private Function<HostOffer, Optional<TaskInfo>> offerAcceptor;
   private OfferQueueImpl offerQueue;
 
   @Before
   public void setUp() {
     driver = createMock(Driver.class);
-    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
-    executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
-    testExecutor = Executors.newCachedThreadPool(threadFactory);
+    ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class);
+    clock = FakeScheduledExecutor.scheduleExecutor(executorMock);
+
     addTearDown(new TearDown() {
       @Override
       public void tearDown() throws Exception {
-        new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
-        new ExecutorServiceShutdown(testExecutor, Amount.of(1L, Time.SECONDS)).execute();
+        clock.assertEmpty();
       }
     });
-    maintenanceController = createMock(MaintenanceController.class);
     offerAcceptor = createMock(new Clazz<Function<HostOffer, Optional<TaskInfo>>>() { });
     OfferReturnDelay returnDelay = new OfferReturnDelay() {
       @Override
-      public Amount<Integer, Time> get() {
+      public Amount<Long, Time> get() {
         return RETURN_DELAY;
       }
     };
-    offerQueue = new OfferQueueImpl(driver, returnDelay, executor, maintenanceController);
+    offerQueue = new OfferQueueImpl(driver, returnDelay, executorMock);
   }
 
   @Test
-  public void testNoDeadlock() throws Exception {
-    // Test that a blocked call to maintenanceController does not result in a deadlock between
-    // the intrinsic lock and the storage lock.
-    final CountDownLatch launchAttempted = new CountDownLatch(1);
-    expect(maintenanceController.getMode(HOST_A)).andAnswer(new IAnswer<MaintenanceMode>() {
-      @Override
-      public MaintenanceMode answer() throws InterruptedException {
-        launchAttempted.await();
-        return MaintenanceMode.NONE;
-      }
-    });
-
-    control.replay();
+  public void testOffersSorted() throws Exception {
+    // Ensures that non-DRAINING offers are preferred - the DRAINING offer would be tried last.
 
-    final CountDownLatch offerAdded = new CountDownLatch(1);
-    testExecutor.submit(new Runnable() {
-      @Override
-      public void run() {
-        offerQueue.addOffer(OFFER_A);
-        offerAdded.countDown();
-      }
-    });
-    testExecutor.submit(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          offerQueue.launchFirst(offerAcceptor);
-          launchAttempted.countDown();
-        } catch (LaunchException e) {
-          throw Throwables.propagate(e);
-        }
-      }
-    });
+    HostOffer offerA = setMode(OFFER_A, DRAINING);
+    HostOffer offerC = setMode(OFFER_C, DRAINING);
 
-    launchAttempted.await();
-    offerAdded.await();
-  }
+    TaskInfo task = TaskInfo.getDefaultInstance();
+    expect(offerAcceptor.apply(OFFER_B)).andReturn(Optional.of(task));
+    driver.launchTask(OFFER_B.getOffer().getId(), task);
 
-  @Test
-  public void testOffersSorted() throws Exception {
-    MaintenanceMode modeA = MaintenanceMode.NONE;
-    MaintenanceMode modeB = MaintenanceMode.DRAINING;
-    MaintenanceMode modeC = MaintenanceMode.NONE;
+    driver.declineOffer(offerA.getOffer().getId());
+    driver.declineOffer(offerC.getOffer().getId());
 
-    HostOffer hostOfferA = new HostOffer(OFFER_A, modeA);
-    HostOffer hostOfferB = new HostOffer(OFFER_B, modeB);
-    HostOffer hostOfferC = new HostOffer(OFFER_C, modeC);
+    control.replay();
 
-    expect(maintenanceController.getMode(HOST_A)).andReturn(modeA);
-    expect(maintenanceController.getMode(HOST_B)).andReturn(modeB);
-    expect(maintenanceController.getMode(HOST_C)).andReturn(modeC);
-    expect(offerAcceptor.apply(hostOfferA)).andReturn(Optional.<TaskInfo>absent());
-    expect(offerAcceptor.apply(hostOfferB)).andReturn(Optional.<TaskInfo>absent());
-    expect(offerAcceptor.apply(hostOfferC)).andReturn(Optional.<TaskInfo>absent());
+    offerQueue.addOffer(offerA);
+    offerQueue.addOffer(OFFER_B);
+    offerQueue.addOffer(offerC);
+    assertTrue(offerQueue.launchFirst(offerAcceptor));
+    clock.advance(RETURN_DELAY);
+  }
 
+  @Test
+  public void testFlushOffers() throws Exception {
     control.replay();
 
     offerQueue.addOffer(OFFER_A);
     offerQueue.addOffer(OFFER_B);
-    offerQueue.addOffer(OFFER_C);
+    offerQueue.driverDisconnected(new DriverDisconnected());
     assertFalse(offerQueue.launchFirst(offerAcceptor));
+    clock.advance(RETURN_DELAY);
   }
 
   @Test
-  public void testFlushOffers() throws Exception {
-    expect(maintenanceController.getMode(HOST_A)).andReturn(MaintenanceMode.NONE);
-    expect(maintenanceController.getMode(HOST_B)).andReturn(MaintenanceMode.NONE);
+  public void testDeclineOffer() throws Exception {
+    driver.declineOffer(OFFER_A.getOffer().getId());
 
     control.replay();
 
     offerQueue.addOffer(OFFER_A);
-    offerQueue.addOffer(OFFER_B);
-    offerQueue.driverDisconnected(new DriverDisconnected());
-    assertFalse(offerQueue.launchFirst(offerAcceptor));
+    clock.advance(RETURN_DELAY);
+  }
+
+  private static HostOffer setMode(HostOffer offer, MaintenanceMode mode) {
+    return new HostOffer(
+        offer.getOffer(),
+        IHostAttributes.build(offer.getAttributes().newBuilder().setMode(mode)));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index c0fa462..59bfbcb 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.testing.FakeClock;
 
@@ -42,6 +43,7 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
@@ -49,7 +51,6 @@ import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
-import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -60,6 +61,7 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.easymock.IExpectationSetters;
@@ -70,7 +72,6 @@ import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import static org.apache.mesos.Protos.Offer;
@@ -103,7 +104,7 @@ public class PreemptorImplTest extends EasyMockTest {
   private StateManager stateManager;
   private SchedulingFilter schedulingFilter;
   private FakeClock clock;
-  private MaintenanceController maintenance;
+  private StatsProvider statsProvider;
   private OfferQueue offerQueue;
   private AttributeAggregate emptyJob;
 
@@ -112,8 +113,8 @@ public class PreemptorImplTest extends EasyMockTest {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
-    maintenance = createMock(MaintenanceController.class);
     clock = new FakeClock();
+    statsProvider = new FakeStatsProvider();
     offerQueue = createMock(OfferQueue.class);
     emptyJob = new AttributeAggregate(
         Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
@@ -128,7 +129,7 @@ public class PreemptorImplTest extends EasyMockTest {
         schedulingFilter,
         PREEMPTION_DELAY,
         clock,
-        maintenance);
+        statsProvider);
 
     preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), emptyJob);
   }
@@ -150,12 +151,10 @@ public class PreemptorImplTest extends EasyMockTest {
         IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
   }
 
-  private void expectGetMaintenance(String host) {
-    expect(maintenance.getMode(host)).andReturn(MaintenanceMode.NONE);
-  }
-
   @Test
   public void testPreempted() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
     schedulingFilter = createMock(SchedulingFilter.class);
     ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
     runOnHost(lowPriority, HOST_A);
@@ -168,8 +167,6 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(highPriority);
     expectGetActiveTasks(lowPriority);
 
-    expectGetMaintenance(HOST_A);
-
     expectFiltering();
     expectPreempted(lowPriority);
 
@@ -179,6 +176,8 @@ public class PreemptorImplTest extends EasyMockTest {
 
   @Test
   public void testLowestPriorityPreempted() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
     schedulingFilter = createMock(SchedulingFilter.class);
     ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10);
     runOnHost(lowPriority, HOST_A);
@@ -194,7 +193,6 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(highPriority);
     expectGetActiveTasks(lowerPriority, lowerPriority);
 
-    expectGetMaintenance(HOST_A);
     expectFiltering();
     expectPreempted(lowerPriority);
 
@@ -204,6 +202,8 @@ public class PreemptorImplTest extends EasyMockTest {
 
   @Test
   public void testOnePreemptableTask() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
     schedulingFilter = createMock(SchedulingFilter.class);
     ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
     runOnHost(highPriority, HOST_A);
@@ -222,7 +222,6 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(pendingPriority);
     expectGetActiveTasks(highPriority, lowerPriority, lowestPriority);
 
-    expectGetMaintenance(HOST_A);
     expectFiltering();
     expectPreempted(lowestPriority);
 
@@ -250,6 +249,8 @@ public class PreemptorImplTest extends EasyMockTest {
 
   @Test
   public void testProductionPreemptingNonproduction() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
     schedulingFilter = createMock(SchedulingFilter.class);
     // Use a very low priority for the production task to show that priority is irrelevant.
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
@@ -263,7 +264,6 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(p1);
     expectGetActiveTasks(a1);
 
-    expectGetMaintenance(HOST_A);
     expectFiltering();
     expectPreempted(a1);
 
@@ -273,6 +273,8 @@ public class PreemptorImplTest extends EasyMockTest {
 
   @Test
   public void testProductionPreemptingNonproductionAcrossUsers() throws Exception {
+    setUpHost(HOST_A, RACK_A);
+
     schedulingFilter = createMock(SchedulingFilter.class);
     // Use a very low priority for the production task to show that priority is irrelevant.
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
@@ -286,7 +288,6 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(p1);
     expectGetActiveTasks(a1);
 
-    expectGetMaintenance(HOST_A);
     expectFiltering();
     expectPreempted(a1);
 
@@ -315,7 +316,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures a production task can preempt 2 tasks on the same host.
   @Test
   public void testProductionPreemptingManyNonProduction() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+    schedulingFilter = new SchedulingFilterImpl();
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
 
@@ -347,7 +348,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures we select the minimal number of tasks to preempt
   @Test
   public void testMinimalSetPreempted() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+    schedulingFilter = new SchedulingFilterImpl();
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
 
@@ -382,7 +383,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures a production task *never* preempts a production task from another job.
   @Test
   public void testProductionJobNeverPreemptsProductionJob() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+    schedulingFilter = new SchedulingFilterImpl();
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
     p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
 
@@ -407,7 +408,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures that we can preempt if a task + offer can satisfy a pending task.
   @Test
   public void testPreemptWithOfferAndTask() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+    schedulingFilter = new SchedulingFilterImpl();
 
     setUpHost(HOST_A, RACK_A);
 
@@ -435,7 +436,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures we can preempt if two tasks and an offer can satisfy a pending task.
   @Test
   public void testPreemptWithOfferAndMultipleTasks() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+    schedulingFilter = new SchedulingFilterImpl();
 
     setUpHost(HOST_A, RACK_A);
 
@@ -468,7 +469,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures we don't preempt if a host has enough slack to satisfy a pending task.
   @Test
   public void testPreemptWithLargeOffer() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+    schedulingFilter = new SchedulingFilterImpl();
 
     setUpHost(HOST_A, RACK_A);
 
@@ -524,7 +525,7 @@ public class PreemptorImplTest extends EasyMockTest {
         schedulingFilter,
         PREEMPTION_DELAY,
         clock,
-        maintenance);
+        statsProvider);
 
     assertEquals(
         Optional.<String>absent(),
@@ -556,7 +557,9 @@ public class PreemptorImplTest extends EasyMockTest {
         .transform(new Function<Offer, HostOffer>() {
           @Override
           public HostOffer apply(Offer offer) {
-            return new HostOffer(offer, MaintenanceMode.NONE);
+            return new HostOffer(
+                offer,
+                IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
           }
         });
     expect(offerQueue.getOffers()).andReturn(hostOffers);
@@ -569,8 +572,7 @@ public class PreemptorImplTest extends EasyMockTest {
   private IExpectationSetters<Set<Veto>> expectFiltering() {
     return expect(schedulingFilter.filter(
         EasyMock.<ResourceSlot>anyObject(),
-        EasyMock.eq(HOST_A),
-        EasyMock.eq(MaintenanceMode.NONE),
+        EasyMock.<IHostAttributes>anyObject(),
         EasyMock.<ITaskConfig>anyObject(),
         EasyMock.<String>anyObject(),
         EasyMock.eq(emptyJob))).andAnswer(
@@ -659,6 +661,5 @@ public class PreemptorImplTest extends EasyMockTest {
 
     expect(this.storageUtil.attributeStore.getHostAttributes(host))
         .andReturn(Optional.of(hostAttrs)).anyTimes();
-    expect(this.maintenance.getMode(host)).andReturn(NONE).anyTimes();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 0e699c9..0e8a98c 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -29,11 +29,13 @@ import com.twitter.common.util.Clock;
 import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
@@ -48,6 +50,7 @@ import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
@@ -58,7 +61,6 @@ import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
@@ -69,8 +71,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   private static final IScheduledTask TASK_A = makeTask("a");
   private static final IScheduledTask TASK_B = makeTask("b");
-  private static final HostOffer OFFER =
-      new HostOffer(Offers.makeOffer("OFFER_A", "HOST_A"), MaintenanceMode.NONE);
+  private static final HostOffer OFFER = new HostOffer(
+      Offers.makeOffer("OFFER_A", "HOST_A"),
+      IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
 
   private StorageTestUtil storageUtil;
   private StateManager stateManager;


Mime
View raw message