aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: Cache the host's maintenance status with offer.
Date Fri, 24 Oct 2014 23:12:02 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 669981d5f -> 3ad0c5fb1


Cache the host's maintenance status with offer.

Bugs closed: AURORA-878

Reviewed at https://reviews.apache.org/r/27100/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/3ad0c5fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/3ad0c5fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/3ad0c5fb

Branch: refs/heads/master
Commit: 3ad0c5fb1c0a705bd2bc11c7a56218146eccc117
Parents: 669981d
Author: Zameer Manji <zmanji@twopensource.com>
Authored: Fri Oct 24 16:11:31 2014 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Fri Oct 24 16:11:31 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/async/OfferQueue.java      |  95 +++++++-----
 .../aurora/scheduler/async/Preemptor.java       |  56 +++++--
 .../aurora/scheduler/async/TaskScheduler.java   |  15 +-
 .../events/NotifyingSchedulingFilter.java       |   4 +-
 .../scheduler/filter/SchedulingFilter.java      |   3 +
 .../scheduler/filter/SchedulingFilterImpl.java  |  17 ++-
 .../apache/aurora/scheduler/http/Offers.java    |   9 +-
 .../aurora/scheduler/state/TaskAssigner.java    |  17 ++-
 .../scheduler/stats/AsyncStatsModule.java       |  15 +-
 .../scheduler/async/OfferQueueImplTest.java     |  25 ++-
 .../scheduler/async/PreemptorImplTest.java      |  43 ++++--
 .../scheduler/async/TaskSchedulerImplTest.java  |  36 ++---
 .../scheduler/async/TaskSchedulerTest.java      |  99 ++++++------
 .../events/NotifyingSchedulingFilterTest.java   |  10 +-
 .../filter/SchedulingFilterImplTest.java        | 153 +++++++++++--------
 .../scheduler/state/TaskAssignerImplTest.java   |   9 +-
 16 files changed, 365 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
index 92c8438..1a45d08 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
@@ -28,7 +28,6 @@ import javax.inject.Inject;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
@@ -48,6 +47,8 @@ import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskInfo;
 
+import static java.util.Objects.requireNonNull;
+
 import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
 import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
@@ -82,7 +83,7 @@ public interface OfferQueue extends EventSubscriber {
    * @throws LaunchException If the acceptor accepted an offer, but there was an error launching the
    *                         task.
    */
-  boolean launchFirst(Function<Offer, Optional<TaskInfo>> acceptor) throws LaunchException;
+  boolean launchFirst(Function<HostOffer, Optional<TaskInfo>> acceptor) throws LaunchException;
 
   /**
    * Notifies the offer queue that a host has changed state.
@@ -96,7 +97,7 @@ public interface OfferQueue extends EventSubscriber {
    *
    * @return A snapshot of the offers that the scheduler is currently holding.
    */
-  Iterable<Offer> getOffers();
+  Iterable<HostOffer> getOffers();
 
   /**
    * Calculates the amount of time before an offer should be 'returned' by declining it.
@@ -119,6 +120,51 @@ public interface OfferQueue extends EventSubscriber {
     }
   }
 
+  /**
+   * Encapsulate an offer from a host, and the host's maintenance mode.
+   */
+  class HostOffer {
+    private final Offer offer;
+
+    // TODO(wfarner): Replace this with HostAttributes for more use of this caching.
+    private final MaintenanceMode mode;
+
+    public HostOffer(Offer offer, MaintenanceMode mode) {
+      this.offer = requireNonNull(offer);
+      this.mode = requireNonNull(mode);
+    }
+
+    public Offer getOffer() {
+      return offer;
+    }
+
+    public MaintenanceMode getMode() {
+      return mode;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof HostOffer)) {
+        return false;
+      }
+      HostOffer other = (HostOffer) o;
+      return Objects.equals(offer, other.offer) && mode == other.mode;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(offer, mode);
+    }
+
+    @Override
+    public String toString() {
+      return com.google.common.base.Objects.toStringHelper(this)
+          .add("offer", offer)
+          .add("mode", mode)
+          .toString();
+    }
+  }
+
   class OfferQueueImpl implements OfferQueue {
     private static final Logger LOG = Logger.getLogger(OfferQueueImpl.class.getName());
 
@@ -187,7 +233,7 @@ public interface OfferQueue extends EventSubscriber {
     }
 
     private boolean removeFromHostOffers(final OfferID offerId) {
-      Objects.requireNonNull(offerId);
+      requireNonNull(offerId);
 
       // The small risk of inconsistency is acceptable here - if we have an accept/remove race
       // on an offer, the master will mark the task as LOST and it will be retried.
@@ -195,14 +241,8 @@ public interface OfferQueue extends EventSubscriber {
     }
 
     @Override
-    public Iterable<Offer> getOffers() {
-      return FluentIterable.from(hostOffers.getWeaklyConsistentOffers())
-          .transform(new Function<HostOffer, Offer>() {
-            @Override
-            public Offer apply(HostOffer offer) {
-              return offer.offer;
-            }
-          });
+    public Iterable<HostOffer> getOffers() {
+      return hostOffers.getWeaklyConsistentOffers();
     }
 
     /**
@@ -230,33 +270,6 @@ public interface OfferQueue extends EventSubscriber {
     }
 
     /**
-     * Encapsulate an offer from a host, and the host's maintenance mode.
-     */
-    private static class HostOffer {
-      private final Offer offer;
-      private final MaintenanceMode mode;
-
-      HostOffer(Offer offer, MaintenanceMode mode) {
-        this.offer = offer;
-        this.mode = mode;
-      }
-
-      @Override
-      public boolean equals(Object o) {
-        if (!(o instanceof HostOffer)) {
-          return false;
-        }
-        HostOffer other = (HostOffer) o;
-        return Objects.equals(offer, other.offer) && mode == other.mode;
-      }
-
-      @Override
-      public int hashCode() {
-        return Objects.hash(offer, mode);
-      }
-    }
-
-    /**
      * A container for the data structures used by this class, to make it easier to reason about
      * the different indices used and their consistency.
      */
@@ -326,14 +339,14 @@ public interface OfferQueue extends EventSubscriber {
     }
 
     @Override
-    public boolean launchFirst(Function<Offer, Optional<TaskInfo>> acceptor)
+    public boolean launchFirst(Function<HostOffer, Optional<TaskInfo>> acceptor)
         throws LaunchException {
 
       // It's important that this method is not called concurrently - doing so would open up the
       // possibility of a race between the same offers being accepted by different threads.
 
       for (HostOffer hostOffer : hostOffers.getWeaklyConsistentOffers()) {
-        Optional<TaskInfo> assignment = acceptor.apply(hostOffer.offer);
+        Optional<TaskInfo> assignment = acceptor.apply(hostOffer);
         if (assignment.isPresent()) {
           // Guard against an offer being removed after we grabbed it from the iterator.
           // If that happens, the offer will not exist in hostOffers, and we can immediately

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index e3e261d..a17738e 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -42,17 +42,18 @@ import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.Stats;
 import com.twitter.common.util.Clock;
 
+import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.mesos.Protos.Offer;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -62,6 +63,7 @@ import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
 
 /**
@@ -128,6 +130,7 @@ public interface Preemptor {
     private final SchedulingFilter schedulingFilter;
     private final Amount<Long, Time> preemptionCandidacyDelay;
     private final Clock clock;
+    private final MaintenanceController maintenance;
 
     /**
      * Creates a new preemptor.
@@ -147,7 +150,8 @@ public interface Preemptor {
         OfferQueue offerQueue,
         SchedulingFilter schedulingFilter,
         @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
-        Clock clock) {
+        Clock clock,
+        MaintenanceController maintenance) {
 
       this.storage = requireNonNull(storage);
       this.stateManager = requireNonNull(stateManager);
@@ -155,6 +159,7 @@ public interface Preemptor {
       this.schedulingFilter = requireNonNull(schedulingFilter);
       this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
       this.clock = requireNonNull(clock);
+      this.maintenance = requireNonNull(maintenance);
     }
 
     private List<IAssignedTask> fetch(Query.Builder query, Predicate<IScheduledTask> filter) {
@@ -192,19 +197,27 @@ public interface Preemptor {
           }
         };
 
-    private static final Function<Offer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
-        new Function<Offer, ResourceSlot>() {
+    private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
+        new Function<HostOffer, ResourceSlot>() {
           @Override
-          public ResourceSlot apply(Offer offer) {
-            return ResourceSlot.from(offer);
+          public ResourceSlot apply(HostOffer hostOffer) {
+            return ResourceSlot.from(hostOffer.getOffer());
           }
         };
 
-    private static final Function<Offer, String> OFFER_TO_HOST =
-        new Function<Offer, String>() {
+    private static final Function<HostOffer, String> OFFER_TO_HOST =
+        new Function<HostOffer, String>() {
           @Override
-          public String apply(Offer offer) {
-            return offer.getHostname();
+          public String apply(HostOffer hostOffer) {
+            return hostOffer.getOffer().getHostname();
+          }
+        };
+
+    private static final Function<HostOffer, MaintenanceMode> OFFER_TO_MODE =
+        new Function<HostOffer, MaintenanceMode>() {
+          @Override
+          public MaintenanceMode apply(HostOffer hostOffer) {
+            return hostOffer.getMode();
           }
         };
 
@@ -220,7 +233,7 @@ public interface Preemptor {
      */
     private Optional<Set<IAssignedTask>> getTasksToPreempt(
         Iterable<IAssignedTask> possibleVictims,
-        Iterable<Offer> offers,
+        Iterable<HostOffer> offers,
         IAssignedTask pendingTask,
         AttributeAggregate attributeAggregate) {
 
@@ -236,9 +249,19 @@ public interface Preemptor {
           ResourceSlot.sum(Iterables.transform(offers, OFFER_TO_RESOURCE_SLOT));
 
       if (!Iterables.isEmpty(offers)) {
+        if (Iterables.size(offers) > 1) {
+          // There are multiple offers for the same host. Since both have maintenance information
+          // we don't preempt with this information and wait for mesos to merge the two offers for
+          // us.
+          return Optional.absent();
+        }
+        MaintenanceMode mode =
+            Iterables.getOnlyElement(FluentIterable.from(offers).transform(OFFER_TO_MODE).toSet());
+
         Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
             slackResources,
             host,
+            mode,
             pendingTask.getTask(),
             pendingTask.getTaskId(),
             attributeAggregate);
@@ -269,6 +292,7 @@ public interface Preemptor {
         Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
             totalResource,
             host,
+            maintenance.getMode(host),
             pendingTask.getTask(),
             pendingTask.getTaskId(),
             attributeAggregate);
@@ -280,11 +304,11 @@ public interface Preemptor {
       return Optional.absent();
     }
 
-    private static final Function<Offer, String> OFFER_TO_SLAVE_ID =
-        new Function<Offer, String>() {
+    private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
+        new Function<HostOffer, String>() {
           @Override
-          public String apply(Offer offer) {
-            return offer.getSlaveId().getValue();
+          public String apply(HostOffer hostOffer) {
+            return hostOffer.getOffer().getSlaveId().getValue();
           }
         };
 
@@ -323,7 +347,7 @@ public interface Preemptor {
       attemptedPreemptions.incrementAndGet();
 
       // Group the offers by slave id so they can be paired with active tasks from the same slave.
-      Multimap<String, Offer> slavesToOffers =
+      Multimap<String, HostOffer> slavesToOffers =
           Multimaps.index(offerQueue.getOffers(), OFFER_TO_SLAVE_ID);
 
       Set<String> allSlaves = ImmutableSet.<String>builder()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 1189ed0..5739c39 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -54,7 +54,6 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.SlaveID;
 import org.apache.mesos.Protos.TaskInfo;
 
@@ -66,6 +65,7 @@ import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 
 /**
  * Enables scheduling and preemption of tasks.
@@ -126,26 +126,27 @@ public interface TaskScheduler extends EventSubscriber {
       this.reservations = new Reservations(reservationDuration, clock);
     }
 
-    private Function<Offer, Optional<TaskInfo>> getAssignerFunction(
+    private Function<HostOffer, Optional<TaskInfo>> getAssignerFunction(
         final AttributeAggregate attributeAggregate,
         final String taskId,
         final IScheduledTask task) {
 
-      return new Function<Offer, Optional<TaskInfo>>() {
+      return new Function<HostOffer, Optional<TaskInfo>>() {
         @Override
-        public Optional<TaskInfo> apply(Offer offer) {
-          Optional<String> reservedTaskId = reservations.getSlaveReservation(offer.getSlaveId());
+        public Optional<TaskInfo> apply(HostOffer hostOffer) {
+          Optional<String> reservedTaskId =
+              reservations.getSlaveReservation(hostOffer.getOffer().getSlaveId());
           if (reservedTaskId.isPresent()) {
             if (taskId.equals(reservedTaskId.get())) {
               // Slave is reserved to satisfy this task.
-              return assigner.maybeAssign(offer, task, attributeAggregate);
+              return assigner.maybeAssign(hostOffer, task, attributeAggregate);
             } else {
               // Slave is reserved for another task.
               return Optional.absent();
             }
           } else {
             // Slave is not reserved.
-            return assigner.maybeAssign(offer, task, attributeAggregate);
+            return assigner.maybeAssign(hostOffer, task, attributeAggregate);
           }
         }
       };

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index 283f7e1..fc17cac 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -20,6 +20,7 @@ import java.util.Set;
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
+import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
@@ -60,11 +61,12 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
   public Set<Veto> filter(
       ResourceSlot offer,
       String slaveHost,
+      MaintenanceMode mode,
       ITaskConfig task,
       String taskId,
       AttributeAggregate jobState) {
 
-    Set<Veto> vetoes = delegate.filter(offer, slaveHost, task, taskId, jobState);
+    Set<Veto> vetoes = delegate.filter(offer, slaveHost, mode, task, taskId, jobState);
     if (!vetoes.isEmpty()) {
       eventSink.post(new Vetoed(taskId, vetoes));
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index 1e3018e..c37272c 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -18,6 +18,7 @@ import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
@@ -105,6 +106,7 @@ public interface SchedulingFilter {
    *
    * @param offer Resources offered.
    * @param slaveHost Host that the resources are associated with.
+   * @param mode Maintenance mode of the host that the resources are associated with.
    * @param task Task.
    * @param taskId Canonical ID of the task.
    * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
@@ -114,6 +116,7 @@ public interface SchedulingFilter {
   Set<Veto> filter(
       ResourceSlot offer,
       String slaveHost,
+      MaintenanceMode mode,
       ITaskConfig task,
       String taskId,
       AttributeAggregate attributeAggregate);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index da29428..0533baa 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -33,7 +33,6 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
-import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
@@ -67,18 +66,15 @@ public class SchedulingFilterImpl implements SchedulingFilter {
   private static final Set<MaintenanceMode> VETO_MODES = EnumSet.of(DRAINING, DRAINED);
 
   private final Storage storage;
-  private final MaintenanceController maintenance;
 
   /**
    * Creates a new scheduling filter.
    *
    * @param storage Interface to accessing the task store.
-   * @param maintenance Interface to accessing the maintenance controller
    */
   @Inject
-  public SchedulingFilterImpl(Storage storage, MaintenanceController maintenance) {
+  public SchedulingFilterImpl(Storage storage) {
     this.storage = requireNonNull(storage);
-    this.maintenance = requireNonNull(maintenance);
   }
 
   /**
@@ -230,8 +226,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
     };
   }
 
-  private Optional<Veto> getMaintenanceVeto(String slaveHost) {
-    MaintenanceMode mode = maintenance.getMode(slaveHost);
+  private Optional<Veto> getMaintenanceVeto(MaintenanceMode mode) {
     return VETO_MODES.contains(mode)
         ? Optional.of(ConstraintFilter.maintenanceVeto(mode.toString().toLowerCase()))
         : NO_VETO;
@@ -261,6 +256,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
   public Set<Veto> filter(
       ResourceSlot offer,
       String slaveHost,
+      MaintenanceMode mode,
       ITaskConfig task,
       String taskId,
       AttributeAggregate attributeAggregate) {
@@ -268,10 +264,15 @@ public class SchedulingFilterImpl implements SchedulingFilter {
     if (!ConfigurationManager.isDedicated(task) && isDedicated(slaveHost)) {
       return ImmutableSet.of(DEDICATED_HOST_VETO);
     }
+
+    Optional<Veto> maintenanceVeto = getMaintenanceVeto(mode);
+    if (maintenanceVeto.isPresent()) {
+      return maintenanceVeto.asSet();
+    }
+
     return ImmutableSet.<Veto>builder()
         .addAll(getConstraintFilter(attributeAggregate, slaveHost).apply(task))
         .addAll(getResourceVetoes(offer, task))
-        .addAll(getMaintenanceVeto(slaveHost).asSet())
         .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/http/Offers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
index b7dfeda..446dc74 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
@@ -34,6 +34,8 @@ import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.Value.Range;
 
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+
 /**
  * Servlet that exposes resource offers that the scheduler is currently retaining.
  */
@@ -119,10 +121,11 @@ public class Offers {
     return FluentIterable.from(iterable).transform(transform).toList();
   }
 
-  private static final Function<Offer, Map<String, ?>> TO_BEAN =
-      new Function<Offer, Map<String, ?>>() {
+  private static final Function<HostOffer, Map<String, ?>> TO_BEAN =
+      new Function<HostOffer, Map<String, ?>>() {
         @Override
-        public Map<String, ?> apply(Offer offer) {
+        public Map<String, ?> apply(HostOffer hostOffer) {
+          Offer offer = hostOffer.getOffer();
           return ImmutableMap.<String, Object>builder()
               .put("id", offer.getId().getValue())
               .put("framework_id", offer.getFrameworkId().getValue())

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 4db9be8..78a9670 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -34,6 +34,8 @@ import org.apache.mesos.Protos.TaskInfo;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+
 /**
  * Responsible for matching a task against an offer.
  */
@@ -43,13 +45,13 @@ public interface TaskAssigner {
    * Tries to match a task against an offer.  If a match is found, the assigner should
    * make the appropriate changes to the task and provide a non-empty result.
    *
-   * @param offer The resource offer.
+   * @param hostOffer The resource offer.
    * @param task The task to match against and optionally assign.
    * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
    * @return Instructions for launching the task if matching and assignment were successful.
    */
   Optional<TaskInfo> maybeAssign(
-      Offer offer,
+      HostOffer hostOffer,
       IScheduledTask task,
       AttributeAggregate attributeAggregate);
 
@@ -87,20 +89,21 @@ public interface TaskAssigner {
 
     @Override
     public Optional<TaskInfo> maybeAssign(
-        Offer offer,
+        HostOffer hostOffer,
         IScheduledTask task,
         AttributeAggregate attributeAggregate) {
 
       Set<Veto> vetoes = filter.filter(
-          ResourceSlot.from(offer),
-          offer.getHostname(),
+          ResourceSlot.from(hostOffer.getOffer()),
+          hostOffer.getOffer().getHostname(),
+          hostOffer.getMode(),
           task.getAssignedTask().getTask(),
           Tasks.id(task),
           attributeAggregate);
       if (vetoes.isEmpty()) {
-        return Optional.of(assign(offer, task));
+        return Optional.of(assign(hostOffer.getOffer(), task));
       } else {
-        LOG.fine("Slave " + offer.getHostname() + " vetoed task " + Tasks.id(task)
+        LOG.fine("Slave " + hostOffer.getOffer().getHostname() + " vetoed task " + Tasks.id(task)
             + ": " + vetoes);
         return Optional.absent();
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
index fc0f496..844a38a 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -42,7 +42,6 @@ import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource;
 import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResourceProvider;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.mesos.Protos.Offer;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -50,6 +49,8 @@ import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+
 /**
  * Module to configure export of cluster-wide resource allocation and consumption statistics.
  */
@@ -111,16 +112,16 @@ public class AsyncStatsModule extends AbstractModule {
   }
 
   static class OfferAdapter implements MachineResourceProvider {
-    private static final Function<Offer, MachineResource> TO_RESOURCE =
-        new Function<Offer, MachineResource>() {
+    private static final Function<HostOffer, MachineResource> TO_RESOURCE =
+        new Function<HostOffer, MachineResource>() {
           @Override
-          public MachineResource apply(Offer offer) {
-            Resources resources = Resources.from(offer);
+          public MachineResource apply(HostOffer hostOffer) {
+            Resources resources = Resources.from(hostOffer.getOffer());
             IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate()
                 .setNumCpus(resources.getNumCpus())
                 .setRamMb(resources.getRam().as(Data.MB))
                 .setDiskMb(resources.getDisk().as(Data.MB)));
-            return new MachineResource(quota, Conversions.isDedicated(offer));
+            return new MachineResource(quota, Conversions.isDedicated(hostOffer.getOffer()));
           }
         };
 
@@ -133,7 +134,7 @@ public class AsyncStatsModule extends AbstractModule {
 
     @Override
     public Iterable<MachineResource> get() {
-      Iterable<Offer> offers = offerQueue.getOffers();
+      Iterable<HostOffer> offers = offerQueue.getOffers();
       return FluentIterable.from(offers).transform(TO_RESOURCE);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
index ddd24c3..15fb7ff 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
@@ -42,6 +42,7 @@ import org.easymock.IAnswer;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertFalse;
 
@@ -59,7 +60,7 @@ public class OfferQueueImplTest extends EasyMockTest {
   private ScheduledExecutorService executor;
   private ExecutorService testExecutor;
   private MaintenanceController maintenanceController;
-  private Function<Offer, Optional<TaskInfo>> offerAcceptor;
+  private Function<HostOffer, Optional<TaskInfo>> offerAcceptor;
   private OfferQueueImpl offerQueue;
 
   @Before
@@ -76,7 +77,7 @@ public class OfferQueueImplTest extends EasyMockTest {
       }
     });
     maintenanceController = createMock(MaintenanceController.class);
-    offerAcceptor = createMock(new Clazz<Function<Offer, Optional<TaskInfo>>>() { });
+    offerAcceptor = createMock(new Clazz<Function<HostOffer, Optional<TaskInfo>>>() { });
     OfferReturnDelay returnDelay = new OfferReturnDelay() {
       @Override
       public Amount<Integer, Time> get() {
@@ -127,12 +128,20 @@ public class OfferQueueImplTest extends EasyMockTest {
 
   @Test
   public void testOffersSorted() throws Exception {
-    expect(maintenanceController.getMode(HOST_A)).andReturn(MaintenanceMode.NONE);
-    expect(maintenanceController.getMode(HOST_B)).andReturn(MaintenanceMode.DRAINING);
-    expect(maintenanceController.getMode(HOST_C)).andReturn(MaintenanceMode.NONE);
-    expect(offerAcceptor.apply(OFFER_A)).andReturn(Optional.<TaskInfo>absent());
-    expect(offerAcceptor.apply(OFFER_C)).andReturn(Optional.<TaskInfo>absent());
-    expect(offerAcceptor.apply(OFFER_B)).andReturn(Optional.<TaskInfo>absent());
+    MaintenanceMode modeA = MaintenanceMode.NONE;
+    MaintenanceMode modeB = MaintenanceMode.DRAINING;
+    MaintenanceMode modeC = MaintenanceMode.NONE;
+
+    HostOffer hostOfferA = new HostOffer(OFFER_A, modeA);
+    HostOffer hostOfferB = new HostOffer(OFFER_B, modeB);
+    HostOffer hostOfferC = new HostOffer(OFFER_C, modeC);
+
+    expect(maintenanceController.getMode(HOST_A)).andReturn(modeA);
+    expect(maintenanceController.getMode(HOST_B)).andReturn(modeB);
+    expect(maintenanceController.getMode(HOST_C)).andReturn(modeC);
+    expect(offerAcceptor.apply(hostOfferA)).andReturn(Optional.<TaskInfo>absent());
+    expect(offerAcceptor.apply(hostOfferB)).andReturn(Optional.<TaskInfo>absent());
+    expect(offerAcceptor.apply(hostOfferC)).andReturn(Optional.<TaskInfo>absent());
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index 18c21e3..c0fa462 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -18,6 +18,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.FluentIterable;
@@ -36,6 +37,7 @@ import org.apache.aurora.gen.Constraint;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
@@ -68,6 +70,7 @@ import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import static org.apache.mesos.Protos.Offer;
@@ -124,7 +127,8 @@ public class PreemptorImplTest extends EasyMockTest {
         offerQueue,
         schedulingFilter,
         PREEMPTION_DELAY,
-        clock);
+        clock,
+        maintenance);
 
     preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), emptyJob);
   }
@@ -146,6 +150,10 @@ public class PreemptorImplTest extends EasyMockTest {
         IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
   }
 
+  private void expectGetMaintenance(String host) {
+    expect(maintenance.getMode(host)).andReturn(MaintenanceMode.NONE);
+  }
+
   @Test
   public void testPreempted() throws Exception {
     schedulingFilter = createMock(SchedulingFilter.class);
@@ -160,6 +168,8 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(highPriority);
     expectGetActiveTasks(lowPriority);
 
+    expectGetMaintenance(HOST_A);
+
     expectFiltering();
     expectPreempted(lowPriority);
 
@@ -184,6 +194,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(highPriority);
     expectGetActiveTasks(lowerPriority, lowerPriority);
 
+    expectGetMaintenance(HOST_A);
     expectFiltering();
     expectPreempted(lowerPriority);
 
@@ -211,6 +222,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(pendingPriority);
     expectGetActiveTasks(highPriority, lowerPriority, lowestPriority);
 
+    expectGetMaintenance(HOST_A);
     expectFiltering();
     expectPreempted(lowestPriority);
 
@@ -251,6 +263,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(p1);
     expectGetActiveTasks(a1);
 
+    expectGetMaintenance(HOST_A);
     expectFiltering();
     expectPreempted(a1);
 
@@ -273,6 +286,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectGetPendingTasks(p1);
     expectGetActiveTasks(a1);
 
+    expectGetMaintenance(HOST_A);
     expectFiltering();
     expectPreempted(a1);
 
@@ -301,7 +315,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures a production task can preempt 2 tasks on the same host.
   @Test
   public void testProductionPreemptingManyNonProduction() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage, maintenance);
+    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
 
@@ -333,7 +347,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures we select the minimal number of tasks to preempt
   @Test
   public void testMinimalSetPreempted() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage, maintenance);
+    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
     ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
     a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
 
@@ -368,7 +382,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures a production task *never* preempts a production task from another job.
   @Test
   public void testProductionJobNeverPreemptsProductionJob() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage, maintenance);
+    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
     ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
     p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
 
@@ -393,7 +407,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures that we can preempt if a task + offer can satisfy a pending task.
   @Test
   public void testPreemptWithOfferAndTask() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage, maintenance);
+    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
 
     setUpHost(HOST_A, RACK_A);
 
@@ -421,7 +435,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures we can preempt if two tasks and an offer can satisfy a pending task.
   @Test
   public void testPreemptWithOfferAndMultipleTasks() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage, maintenance);
+    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
 
     setUpHost(HOST_A, RACK_A);
 
@@ -454,7 +468,7 @@ public class PreemptorImplTest extends EasyMockTest {
   // Ensures we don't preempt if a host has enough slack to satisfy a pending task.
   @Test
   public void testPreemptWithLargeOffer() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage, maintenance);
+    schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
 
     setUpHost(HOST_A, RACK_A);
 
@@ -509,7 +523,8 @@ public class PreemptorImplTest extends EasyMockTest {
         offerQueue,
         schedulingFilter,
         PREEMPTION_DELAY,
-        clock);
+        clock,
+        maintenance);
 
     assertEquals(
         Optional.<String>absent(),
@@ -537,17 +552,25 @@ public class PreemptorImplTest extends EasyMockTest {
   }
 
   private void expectOffers(Offer ... offers) {
-    expect(offerQueue.getOffers()).andReturn(Lists.newArrayList(offers));
+    Iterable<HostOffer> hostOffers = FluentIterable.from(Lists.newArrayList(offers))
+        .transform(new Function<Offer, HostOffer>() {
+          @Override
+          public HostOffer apply(Offer offer) {
+            return new HostOffer(offer, MaintenanceMode.NONE);
+          }
+        });
+    expect(offerQueue.getOffers()).andReturn(hostOffers);
   }
 
   private void expectNoOffers() {
-    expect(offerQueue.getOffers()).andReturn(ImmutableList.<Offer>of());
+    expect(offerQueue.getOffers()).andReturn(ImmutableList.<HostOffer>of());
   }
 
   private IExpectationSetters<Set<Veto>> expectFiltering() {
     return expect(schedulingFilter.filter(
         EasyMock.<ResourceSlot>anyObject(),
         EasyMock.eq(HOST_A),
+        EasyMock.eq(MaintenanceMode.NONE),
         EasyMock.<ITaskConfig>anyObject(),
         EasyMock.<String>anyObject(),
         EasyMock.eq(emptyJob))).andAnswer(

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index bb0b3b2..5d12df9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -29,6 +29,7 @@ import com.twitter.common.util.testing.FakeClock;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
@@ -47,7 +48,6 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.TaskInfo;
 import org.easymock.Capture;
 import org.junit.Before;
@@ -55,6 +55,7 @@ import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
@@ -65,7 +66,8 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   private static final IScheduledTask TASK_A = makeTask("a");
   private static final IScheduledTask TASK_B = makeTask("b");
-  private static final Offer OFFER = Offers.makeOffer("OFFER_A", "HOST_A");
+  private static final HostOffer OFFER =
+      new HostOffer(Offers.makeOffer("OFFER_A", "HOST_A"), MaintenanceMode.NONE);
 
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
@@ -135,16 +137,16 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
     expect(preemptor.findPreemptionSlotFor("a", emptyJob))
-        .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
+        .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
 
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
-    Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(false);
+    Capture<Function<HostOffer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(false);
     expect(preemptor.findPreemptionSlotFor("b", emptyJob)).andReturn(Optional.<String>absent());
 
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
-    Capture<Function<Offer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
+    Capture<Function<HostOffer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
     expectAssigned(TASK_B);
 
     control.replay();
@@ -170,17 +172,17 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
     expect(preemptor.findPreemptionSlotFor("a", emptyJob))
-        .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
+        .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
+    Capture<Function<HostOffer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
     expectAssigned(TASK_A);
 
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
 
-    Capture<Function<Offer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
+    Capture<Function<HostOffer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
 
     expect(assigner.maybeAssign(OFFER, TASK_B, emptyJob))
         .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
@@ -203,11 +205,11 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
     expect(preemptor.findPreemptionSlotFor("a", emptyJob))
-        .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
+        .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
+    Capture<Function<HostOffer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
     expectAssigned(TASK_A);
 
     control.replay();
@@ -228,11 +230,11 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     // Reserve "a" with offerA
     expect(preemptor.findPreemptionSlotFor("a", emptyJob))
-        .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
+        .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
 
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
-    Capture<Function<Offer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
+    Capture<Function<HostOffer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
     expectAssigned(TASK_B);
 
     control.replay();
@@ -253,11 +255,11 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectLaunchAttempt(false);
     // Reserve "b" with offer1
     expect(preemptor.findPreemptionSlotFor("b", emptyJob))
-        .andReturn(Optional.of(OFFER.getSlaveId().getValue()));
+        .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    Capture<Function<Offer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
+    Capture<Function<HostOffer, Optional<TaskInfo>>> firstAssignment = expectLaunchAttempt(true);
     expectAssigned(TASK_A);
 
     control.replay();
@@ -291,7 +293,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
       }
     });
 
-    Capture<Function<Offer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
+    Capture<Function<HostOffer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
     expect(assigner.maybeAssign(OFFER, taskA, emptyJob))
         .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
 
@@ -313,9 +315,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
                 .setEnvironment("env-" + taskId))));
   }
 
-  private Capture<Function<Offer, Optional<TaskInfo>>> expectLaunchAttempt(boolean taskLaunched)
+  private Capture<Function<HostOffer, Optional<TaskInfo>>> expectLaunchAttempt(boolean taskLaunched)
       throws OfferQueue.LaunchException {
-        Capture<Function<Offer, Optional<TaskInfo>>> assignment = createCapture();
+        Capture<Function<HostOffer, Optional<TaskInfo>>> assignment = createCapture();
         expect(offerQueue.launchFirst(capture(assignment))).andReturn(taskLaunched);
         return assignment;
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index ec60880..fe57375 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -74,6 +74,7 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
@@ -88,10 +89,14 @@ public class TaskSchedulerTest extends EasyMockTest {
 
   private static final long FIRST_SCHEDULE_DELAY_MS = 1L;
 
-  private static final Offer OFFER_A = Offers.makeOffer("OFFER_A", "HOST_A");
-  private static final Offer OFFER_B = Offers.makeOffer("OFFER_B", "HOST_B");
-  private static final Offer OFFER_C = Offers.makeOffer("OFFER_C", "HOST_C");
-  private static final Offer OFFER_D = Offers.makeOffer("OFFER_D", "HOST_D");
+  private static final HostOffer OFFER_A =
+      new HostOffer(Offers.makeOffer("OFFER_A", "HOST_A"), MaintenanceMode.NONE);
+  private static final HostOffer OFFER_B =
+      new HostOffer(Offers.makeOffer("OFFER_B", "HOST_B"), MaintenanceMode.SCHEDULED);
+  private static final HostOffer OFFER_C =
+      new HostOffer(Offers.makeOffer("OFFER_C", "HOST_C"), MaintenanceMode.DRAINING);
+  private static final HostOffer OFFER_D =
+      new HostOffer(Offers.makeOffer("OFFER_D", "HOST_D"), MaintenanceMode.DRAINED);
 
   private Storage storage;
 
@@ -204,8 +209,8 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A);
-    offerQueue.addOffer(OFFER_B);
+    offerQueue.addOffer(OFFER_A.getOffer());
+    offerQueue.addOffer(OFFER_B.getOffer());
   }
 
   @Test
@@ -283,7 +288,7 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
     expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.of(mesosTask));
-    driver.launchTask(OFFER_A.getId(), mesosTask);
+    driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
 
     Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
@@ -291,7 +296,7 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A);
+    offerQueue.addOffer(OFFER_A.getOffer());
     changeState(task, INIT, PENDING);
     timeoutCapture.getValue().run();
     timeoutCapture2.getValue().run();
@@ -314,7 +319,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     expectAnyMaintenanceCalls();
     expectOfferDeclineIn(10);
     expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.of(mesosTask));
-    driver.launchTask(OFFER_A.getId(), mesosTask);
+    driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
     expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
     expect(stateManager.changeState(
         "a",
@@ -326,7 +331,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     replayAndCreateScheduler();
 
     changeState(task, INIT, PENDING);
-    offerQueue.addOffer(OFFER_A);
+    offerQueue.addOffer(OFFER_A.getOffer());
     timeoutCapture.getValue().run();
   }
 
@@ -347,13 +352,13 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
     expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.of(mesosTask));
-    driver.launchTask(OFFER_A.getId(), mesosTask);
+    driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
     expectLastCall();
 
     replayAndCreateScheduler();
 
     changeState(task, INIT, PENDING);
-    offerQueue.addOffer(OFFER_A);
+    offerQueue.addOffer(OFFER_A.getOffer());
     timeoutCapture.getValue().run();
     timeoutCapture2.getValue().run();
   }
@@ -368,14 +373,14 @@ public class TaskSchedulerTest extends EasyMockTest {
     expect(assigner.maybeAssign(OFFER_A, task, emptyJob)).andReturn(Optional.<TaskInfo>absent());
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
     expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
-    driver.declineOffer(OFFER_A.getId());
+    driver.declineOffer(OFFER_A.getOffer().getId());
     expectTaskGroupBackoff(10, 20);
     expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
 
     replayAndCreateScheduler();
 
     changeState(task, INIT, PENDING);
-    offerQueue.addOffer(OFFER_A);
+    offerQueue.addOffer(OFFER_A.getOffer());
     timeoutCapture.getValue().run();
     offerExpirationCapture.getValue().run();
     timeoutCapture2.getValue().run();
@@ -387,14 +392,14 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
 
     Offer offerAB =
-        Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getSlaveId()).build();
+        Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getOffer().getSlaveId()).build();
 
-    driver.declineOffer(OFFER_A.getId());
+    driver.declineOffer(OFFER_A.getOffer().getId());
     driver.declineOffer(offerAB.getId());
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A);
+    offerQueue.addOffer(OFFER_A.getOffer());
     offerQueue.addOffer(offerAB);
     offerExpirationCapture.getValue().run();
   }
@@ -404,15 +409,15 @@ public class TaskSchedulerTest extends EasyMockTest {
     expectAnyMaintenanceCalls();
     Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
 
-    Function<Offer, Optional<TaskInfo>> offerAcceptor =
-        createMock(new Clazz<Function<Offer, Optional<TaskInfo>>>() { });
+    Function<HostOffer, Optional<TaskInfo>> offerAcceptor =
+        createMock(new Clazz<Function<HostOffer, Optional<TaskInfo>>>() { });
     final TaskInfo taskInfo = TaskInfo.getDefaultInstance();
     expect(offerAcceptor.apply(OFFER_A)).andReturn(Optional.of(taskInfo));
-    driver.launchTask(OFFER_A.getId(), taskInfo);
+    driver.launchTask(OFFER_A.getOffer().getId(), taskInfo);
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A);
+    offerQueue.addOffer(OFFER_A.getOffer());
     offerQueue.launchFirst(offerAcceptor);
     offerExpirationCapture.getValue().run();
   }
@@ -420,32 +425,32 @@ public class TaskSchedulerTest extends EasyMockTest {
   @Test
   public void testBasicMaintenancePreferences() {
     expectOffer();
-    expect(maintenance.getMode("HOST_D")).andReturn(MaintenanceMode.DRAINED);
+    expect(maintenance.getMode("HOST_D")).andReturn(OFFER_D.getMode());
     expectOffer();
-    expect(maintenance.getMode("HOST_C")).andReturn(MaintenanceMode.DRAINING);
+    expect(maintenance.getMode("HOST_C")).andReturn(OFFER_C.getMode());
     expectOffer();
-    expect(maintenance.getMode("HOST_B")).andReturn(MaintenanceMode.SCHEDULED);
+    expect(maintenance.getMode("HOST_B")).andReturn(OFFER_B.getMode());
     expectOffer();
-    expect(maintenance.getMode("HOST_A")).andReturn(MaintenanceMode.NONE);
+    expect(maintenance.getMode("HOST_A")).andReturn(OFFER_A.getMode());
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
     expect(assigner.maybeAssign(OFFER_A, taskA, emptyJob)).andReturn(Optional.of(mesosTaskA));
-    driver.launchTask(OFFER_A.getId(), mesosTaskA);
+    driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA);
     Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
     IScheduledTask taskB = makeTask("B", PENDING);
     TaskInfo mesosTaskB = makeTaskInfo(taskB);
     expect(assigner.maybeAssign(OFFER_B, taskB, emptyJob)).andReturn(Optional.of(mesosTaskB));
-    driver.launchTask(OFFER_B.getId(), mesosTaskB);
+    driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB);
     Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_D);
-    offerQueue.addOffer(OFFER_C);
-    offerQueue.addOffer(OFFER_B);
-    offerQueue.addOffer(OFFER_A);
+    offerQueue.addOffer(OFFER_D.getOffer());
+    offerQueue.addOffer(OFFER_C.getOffer());
+    offerQueue.addOffer(OFFER_B.getOffer());
+    offerQueue.addOffer(OFFER_A.getOffer());
 
     changeState(taskA, INIT, PENDING);
     captureA.getValue().run();
@@ -457,29 +462,31 @@ public class TaskSchedulerTest extends EasyMockTest {
   @Test
   public void testChangingMaintenancePreferences() {
     expectOffer();
-    expect(maintenance.getMode("HOST_A")).andReturn(MaintenanceMode.NONE);
+    expect(maintenance.getMode("HOST_A")).andReturn(OFFER_A.getMode());
     expectOffer();
-    expect(maintenance.getMode("HOST_B")).andReturn(MaintenanceMode.SCHEDULED);
+    expect(maintenance.getMode("HOST_B")).andReturn(OFFER_B.getMode());
     expectOffer();
-    expect(maintenance.getMode("HOST_C")).andReturn(MaintenanceMode.DRAINED);
+    expect(maintenance.getMode("HOST_C")).andReturn(OFFER_C.getMode());
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
     expect(assigner.maybeAssign(OFFER_B, taskA, emptyJob)).andReturn(Optional.of(mesosTaskA));
-    driver.launchTask(OFFER_B.getId(), mesosTaskA);
+    driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA);
     Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
     IScheduledTask taskB = makeTask("B", PENDING);
     TaskInfo mesosTaskB = makeTaskInfo(taskB);
-    expect(assigner.maybeAssign(OFFER_C, taskB, emptyJob)).andReturn(Optional.of(mesosTaskB));
-    driver.launchTask(OFFER_C.getId(), mesosTaskB);
+    HostOffer updatedOfferC =
+        new HostOffer(OFFER_C.getOffer(), MaintenanceMode.NONE);
+    expect(assigner.maybeAssign(updatedOfferC, taskB, emptyJob)).andReturn(Optional.of(mesosTaskB));
+    driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB);
     Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A);
-    offerQueue.addOffer(OFFER_B);
-    offerQueue.addOffer(OFFER_C);
+    offerQueue.addOffer(OFFER_A.getOffer());
+    offerQueue.addOffer(OFFER_B.getOffer());
+    offerQueue.addOffer(OFFER_C.getOffer());
 
     // Initially, we'd expect the offers to be consumed in order (A, B), with (C) unschedulable
 
@@ -498,7 +505,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     TaskInfo mesosTask = makeTaskInfo(task);
     Capture<IScheduledTask> taskScheduled = createCapture();
     expect(assigner.maybeAssign(
-        EasyMock.<Offer>anyObject(),
+        EasyMock.<HostOffer>anyObject(),
         capture(taskScheduled),
         EasyMock.eq(emptyJob)))
         .andReturn(Optional.of(mesosTask));
@@ -543,10 +550,10 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A);
-    offerQueue.addOffer(OFFER_B);
-    offerQueue.addOffer(OFFER_C);
-    offerQueue.addOffer(OFFER_D);
+    offerQueue.addOffer(OFFER_A.getOffer());
+    offerQueue.addOffer(OFFER_B.getOffer());
+    offerQueue.addOffer(OFFER_C.getOffer());
+    offerQueue.addOffer(OFFER_D.getOffer());
     changeState(jobA0, INIT, PENDING);
     changeState(jobA1, INIT, PENDING);
     changeState(jobA2, INIT, PENDING);
@@ -572,7 +579,7 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     replayAndCreateScheduler();
 
-    offerQueue.addOffer(OFFER_A);
+    offerQueue.addOffer(OFFER_A.getOffer());
     changeState(task, INIT, PENDING);
     timeoutCapture.getValue().run();
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index 4065629..0318179 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -19,6 +19,7 @@ import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
+import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
@@ -43,6 +44,7 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
   private static final ResourceSlot TASK_RESOURCES = ResourceSlot.from(TASK);
   private static final String TASK_ID = "taskId";
   private static final String SLAVE = "slaveHost";
+  private static final MaintenanceMode MODE = MaintenanceMode.NONE;
 
   private static final Veto VETO_1 = new Veto("veto1", 1);
   private static final Veto VETO_2 = new Veto("veto2", 2);
@@ -65,21 +67,21 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
   @Test
   public void testEvents() {
     Set<Veto> vetoes = ImmutableSet.of(VETO_1, VETO_2);
-    expect(delegate.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
+    expect(delegate.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
     eventSink.post(new Vetoed(TASK_ID, vetoes));
 
     control.replay();
 
-    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, emptyJob));
+    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob));
   }
 
   @Test
   public void testNoVetoes() {
     Set<Veto> vetoes = ImmutableSet.of();
-    expect(delegate.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
+    expect(delegate.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
 
     control.replay();
 
-    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, TASK, TASK_ID, emptyJob));
+    assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index 6a9c4ee..bffbf83 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -40,7 +40,6 @@ import org.apache.aurora.gen.ValueConstraint;
 import org.apache.aurora.scheduler.ResourceSlot;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
@@ -99,13 +98,13 @@ public class SchedulingFilterImplTest extends EasyMockTest {
       Amount.of(DEFAULT_RAM, Data.MB),
       Amount.of(DEFAULT_DISK, Data.MB),
       0);
+  private static final MaintenanceMode DEFAULT_MODE = MaintenanceMode.NONE;
 
   private AttributeAggregate emptyJob;
 
   private final AtomicLong taskIdCounter = new AtomicLong();
 
   private SchedulingFilter defaultFilter;
-  private MaintenanceController maintenance;
   private Storage storage;
   private StoreProvider storeProvider;
   private AttributeStore.Mutable attributeStore;
@@ -113,8 +112,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   @Before
   public void setUp() throws Exception {
     storage = createMock(Storage.class);
-    maintenance = createMock(MaintenanceController.class);
-    defaultFilter = new SchedulingFilterImpl(storage, maintenance);
+    defaultFilter = new SchedulingFilterImpl(storage);
     storeProvider = createMock(StoreProvider.class);
     attributeStore = createMock(AttributeStore.Mutable.class);
     emptyJob = new AttributeAggregate(
@@ -141,7 +139,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   @Test
   public void testMeetsOffer() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-    expectGetHostMaintenanceStatus(HOST_A).times(2);
 
     control.replay();
 
@@ -152,7 +149,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   @Test
   public void testSufficientPorts() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-    expectGetHostMaintenanceStatus(HOST_A).times(4);
 
     control.replay();
 
@@ -176,33 +172,35 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         .setRequestedPorts(ImmutableSet.of("one", "two", "three")));
 
     Set<Veto> none = ImmutableSet.of();
-    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, noPortTask, TASK_ID, emptyJob));
-    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, onePortTask, TASK_ID, emptyJob));
-    assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, twoPortTask, TASK_ID, emptyJob));
+    assertEquals(none,
+        defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, noPortTask, TASK_ID, emptyJob));
+    assertEquals(none,
+        defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, onePortTask, TASK_ID, emptyJob));
+    assertEquals(none,
+        defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, twoPortTask, TASK_ID, emptyJob));
     assertEquals(
         ImmutableSet.of(PORTS.veto(1)),
-        defaultFilter.filter(twoPorts, HOST_A, threePortTask, TASK_ID, emptyJob));
+        defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, threePortTask, TASK_ID, emptyJob));
   }
 
   @Test
   public void testInsufficientResources() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-    expectGetHostMaintenanceStatus(HOST_A).times(4);
 
     control.replay();
 
     assertVetoes(
         makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM + 1, DEFAULT_DISK + 1),
+        DEFAULT_MODE,
         CPU.veto(1), DISK.veto(1), RAM.veto(1));
-    assertVetoes(makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM, DEFAULT_DISK), CPU.veto(1));
-    assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM + 1, DEFAULT_DISK), RAM.veto(1));
-    assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK + 1), DISK.veto(1));
+    assertVetoes(makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM, DEFAULT_DISK), DEFAULT_MODE, CPU.veto(1));
+    assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM + 1, DEFAULT_DISK), DEFAULT_MODE, RAM.veto(1));
+    assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK + 1), DEFAULT_MODE, DISK.veto(1));
   }
 
   @Test
   public void testDedicatedRole() throws Exception {
     expectGetHostAttributes(HOST_A, dedicated(ROLE_A)).anyTimes();
-    expectGetHostMaintenanceStatus(HOST_A).times(2);
 
     control.replay();
 
@@ -216,7 +214,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     String dedicated2 = "kestrel/kestrel";
 
     expectGetHostAttributes(HOST_A, dedicated(dedicated1, dedicated2)).anyTimes();
-    expectGetHostMaintenanceStatus(HOST_A).atLeastOnce();
 
     control.replay();
 
@@ -240,8 +237,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   public void testMultiValuedAttributes() throws Exception {
     expectGetHostAttributes(HOST_A, valueAttribute("jvm", "1.0", "2.0", "3.0")).anyTimes();
     expectGetHostAttributes(HOST_B, valueAttribute("jvm", "1.0")).anyTimes();
-    expectGetHostMaintenanceStatus(HOST_A).atLeastOnce();
-    expectGetHostMaintenanceStatus(HOST_B).atLeastOnce();
 
     control.replay();
 
@@ -255,39 +250,36 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   @Test
   public void testHostScheduledForMaintenance() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-    expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.SCHEDULED);
 
     control.replay();
 
-    assertNoVetoes(makeTask(), HOST_A);
+    assertNoVetoes(makeTask(), HOST_A, MaintenanceMode.SCHEDULED);
   }
 
   @Test
   public void testHostDrainingForMaintenance() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-    expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.DRAINING);
 
     control.replay();
 
-    assertVetoes(makeTask(), ConstraintFilter.maintenanceVeto("draining"));
+    assertVetoes(makeTask(),
+        MaintenanceMode.DRAINING,
+        ConstraintFilter.maintenanceVeto("draining"));
   }
 
   @Test
   public void testHostDrainedForMaintenance() throws Exception {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-    expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.DRAINED);
 
     control.replay();
 
-    assertVetoes(makeTask(), ConstraintFilter.maintenanceVeto("drained"));
+    assertVetoes(makeTask(), MaintenanceMode.DRAINED, ConstraintFilter.maintenanceVeto("drained"));
   }
 
   @Test
   public void testMultipleTaskConstraints() throws Exception {
     expectGetHostAttributes(HOST_A, dedicated(HOST_A), host(HOST_A));
     expectGetHostAttributes(HOST_B, dedicated("xxx"), host(HOST_A));
-    expectGetHostMaintenanceStatus(HOST_A);
-    expectGetHostMaintenanceStatus(HOST_B);
 
     control.replay();
 
@@ -298,7 +290,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         makeTask(OWNER_A, JOB_A, constraint1, constraint2),
         HOST_A,
         mismatchVeto(DEDICATED_ATTRIBUTE));
-    assertNoVetoes(makeTask(OWNER_B, JOB_B, constraint1, constraint2), HOST_B);
+    assertNoVetoes(makeTask(OWNER_B, JOB_B, constraint1, constraint2), HOST_B, DEFAULT_MODE);
   }
 
   @Test
@@ -308,8 +300,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
 
     expectGetHostAttributes(HOST_A, host(HOST_A));
     expectGetHostAttributes(HOST_B, dedicated(OWNER_B.getRole() + "/" + JOB_B), host(HOST_B));
-    expectGetHostMaintenanceStatus(HOST_A);
-    expectGetHostMaintenanceStatus(HOST_B);
 
     control.replay();
 
@@ -328,11 +318,10 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   public void testUnderLimitNoTasks() throws Exception {
     expectGetHostAttributes(HOST_A);
     expectGetHostAttributes(HOST_A, host(HOST_A));
-    expectGetHostMaintenanceStatus(HOST_A);
 
     control.replay();
 
-    assertNoVetoes(hostLimitTask(2), HOST_A);
+    assertNoVetoes(hostLimitTask(2), HOST_A, DEFAULT_MODE);
   }
 
   private Attribute host(String host) {
@@ -352,9 +341,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
     expectGetHostAttributes(HOST_B, host(HOST_B), rack(RACK_A)).atLeastOnce();
     expectGetHostAttributes(HOST_C, host(HOST_C), rack(RACK_B)).atLeastOnce();
-    expectGetHostMaintenanceStatus(HOST_A).atLeastOnce();
-    expectGetHostMaintenanceStatus(HOST_B).atLeastOnce();
-    expectGetHostMaintenanceStatus(HOST_C).atLeastOnce();
 
     AttributeAggregate stateA = new AttributeAggregate(Suppliers.ofInstance(ImmutableSet.of(
         makeScheduledTask(OWNER_A, JOB_A, HOST_A),
@@ -371,24 +357,48 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     control.replay();
 
     assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_A, stateA);
-    assertVetoes(hostLimitTask(OWNER_A, JOB_A, 1), HOST_B, stateA, limitVeto(HOST_ATTRIBUTE));
-    assertVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_B, stateA, limitVeto(HOST_ATTRIBUTE));
+    assertVetoes(
+        hostLimitTask(OWNER_A, JOB_A, 1),
+        HOST_B,
+        stateA,
+        DEFAULT_MODE,
+        limitVeto(HOST_ATTRIBUTE));
+    assertVetoes(
+        hostLimitTask(OWNER_A, JOB_A, 2),
+        HOST_B,
+        stateA,
+        DEFAULT_MODE,
+        limitVeto(HOST_ATTRIBUTE));
     assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 3), HOST_B, stateA);
 
-    assertVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_B, stateB, limitVeto(RACK_ATTRIBUTE));
-    assertVetoes(rackLimitTask(OWNER_B, JOB_A, 3), HOST_B, stateB, limitVeto(RACK_ATTRIBUTE));
+    assertVetoes(
+        rackLimitTask(OWNER_B, JOB_A, 2),
+        HOST_B,
+        stateB,
+        DEFAULT_MODE,
+        limitVeto(RACK_ATTRIBUTE));
+    assertVetoes(
+        rackLimitTask(OWNER_B, JOB_A, 3),
+        HOST_B,
+        stateB,
+        DEFAULT_MODE,
+        limitVeto(RACK_ATTRIBUTE));
     assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 4), HOST_B, stateB);
 
     assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 1), HOST_C, stateB);
 
-    assertVetoes(rackLimitTask(OWNER_A, JOB_A, 1), HOST_C, stateA, limitVeto(RACK_ATTRIBUTE));
+    assertVetoes(
+        rackLimitTask(OWNER_A, JOB_A, 1),
+        HOST_C,
+        stateA,
+        DEFAULT_MODE,
+        limitVeto(RACK_ATTRIBUTE));
     assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_C, stateB);
   }
 
   @Test
   public void testAttribute() throws Exception {
     expectGetHostAttributes(HOST_A, valueAttribute("jvm", "1.0")).atLeastOnce();
-    expectGetHostMaintenanceStatus(HOST_A).atLeastOnce();
 
     control.replay();
 
@@ -413,7 +423,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     expectGetHostAttributes(HOST_A,
         valueAttribute("jvm", "1.4", "1.6", "1.7"),
         valueAttribute("zone", "a", "b", "c")).atLeastOnce();
-    expectGetHostMaintenanceStatus(HOST_A).atLeastOnce();
 
     control.replay();
 
@@ -437,7 +446,13 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     Constraint zoneConstraint = makeConstraint("zone", "c");
 
     ITaskConfig task = makeTask(OWNER_A, JOB_A, jvmConstraint, zoneConstraint);
-    assertTrue(defaultFilter.filter(DEFAULT_OFFER, HOST_A, task, TASK_ID, emptyJob).isEmpty());
+    assertTrue(
+        defaultFilter.filter(
+            DEFAULT_OFFER,
+            HOST_A,
+            DEFAULT_MODE,
+            task, TASK_ID,
+            emptyJob).isEmpty());
 
     Constraint jvmNegated = jvmConstraint.deepCopy();
     jvmNegated.getConstraint().getValue().setNegated(true);
@@ -464,7 +479,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     expectGetHostAttributes(HOST_A,
         valueAttribute("jvm", "1.4"),
         valueAttribute("jvm", "1.6", "1.7")).atLeastOnce();
-    expectGetHostMaintenanceStatus(HOST_A).atLeastOnce();
 
     control.replay();
 
@@ -519,14 +533,26 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     ITaskConfig task = makeTask(owner, jobName, constraint);
     assertEquals(
         expected,
-        defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID, aggregate).isEmpty());
+        defaultFilter.filter(
+            DEFAULT_OFFER,
+            host,
+            DEFAULT_MODE,
+            task,
+            TASK_ID,
+            aggregate).isEmpty());
 
     Constraint negated = constraint.deepCopy();
     negated.getConstraint().getValue().setNegated(!value.isNegated());
     ITaskConfig negatedTask = makeTask(owner, jobName, negated);
     assertEquals(
         !expected,
-        defaultFilter.filter(DEFAULT_OFFER, host, negatedTask, TASK_ID, aggregate).isEmpty());
+        defaultFilter.filter(
+            DEFAULT_OFFER,
+            host,
+            DEFAULT_MODE,
+            negatedTask,
+            TASK_ID,
+            aggregate).isEmpty());
     return task;
   }
 
@@ -538,20 +564,27 @@ public class SchedulingFilterImplTest extends EasyMockTest {
     assertNoVetoes(task, HOST_A, jobState);
   }
 
-  private void assertNoVetoes(ITaskConfig task, String host) {
-    assertVetoes(task, host, emptyJob);
+  private void assertNoVetoes(ITaskConfig task, String host, MaintenanceMode mode) {
+    assertVetoes(task, host, emptyJob, mode);
   }
 
-  private void assertNoVetoes(ITaskConfig task, String host, AttributeAggregate jobState) {
-    assertVetoes(task, host, jobState);
+  private void assertNoVetoes(
+      ITaskConfig task,
+      String host,
+      AttributeAggregate jobState) {
+    assertVetoes(task, host, jobState, DEFAULT_MODE);
   }
 
-  private void assertVetoes(ITaskConfig task, Veto... vetos) {
-    assertVetoes(task, emptyJob, vetos);
+  private void assertVetoes(ITaskConfig task, MaintenanceMode mode, Veto... vetos) {
+    assertVetoes(task, emptyJob, mode, vetos);
   }
 
-  private void assertVetoes(ITaskConfig task, AttributeAggregate jobState, Veto... vetos) {
-    assertVetoes(task, HOST_A, jobState, vetos);
+  private void assertVetoes(
+      ITaskConfig task,
+      AttributeAggregate jobState,
+      MaintenanceMode mode,
+      Veto... vetos) {
+    assertVetoes(task, HOST_A, jobState, mode, vetos);
   }
 
   private void assertVetoes(
@@ -559,18 +592,19 @@ public class SchedulingFilterImplTest extends EasyMockTest {
       String host,
       Veto... vetoes) {
 
-    assertVetoes(task, host, emptyJob, vetoes);
+    assertVetoes(task, host, emptyJob, DEFAULT_MODE, vetoes);
   }
 
   private void assertVetoes(
       ITaskConfig task,
       String host,
       AttributeAggregate jobState,
+      MaintenanceMode mode,
       Veto... vetoes) {
 
     assertEquals(
         ImmutableSet.copyOf(vetoes),
-        defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID, jobState));
+        defaultFilter.filter(DEFAULT_OFFER, host, mode, task, TASK_ID, jobState));
   }
 
   private Attribute valueAttribute(String name, String string, String... strings) {
@@ -583,15 +617,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         TaskConstraint.value(new ValueConstraint(false, ImmutableSet.copyOf(values))));
   }
 
-  private IExpectationSetters<MaintenanceMode> expectGetHostMaintenanceStatus(String host) {
-    return expectGetHostMaintenanceStatus(host, MaintenanceMode.NONE);
-  }
-
-  private IExpectationSetters<MaintenanceMode> expectGetHostMaintenanceStatus(
-      String host, MaintenanceMode mode) {
-    return expect(maintenance.getMode(host)).andReturn(mode);
-  }
-
   private IExpectationSetters<Optional<IHostAttributes>> expectGetHostAttributes(
       String host,
       Attribute... attributes) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/3ad0c5fb/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index c48cbae..c7da5ab 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -20,6 +20,7 @@ import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.MesosTaskFactory;
@@ -44,6 +45,7 @@ import org.apache.mesos.Protos.Value.Type;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
@@ -61,6 +63,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
           .setRanges(
               Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
       .build();
+  private static final HostOffer HOST_OFFER = new HostOffer(OFFER, MaintenanceMode.NONE);
   private static final IScheduledTask TASK = IScheduledTask.build(
       new ScheduledTask()
           .setAssignedTask(new AssignedTask()
@@ -96,6 +99,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
     expect(filter.filter(
         ResourceSlot.from(OFFER),
         OFFER.getHostname(),
+        MaintenanceMode.NONE,
         TASK.getAssignedTask().getTask(),
         Tasks.id(TASK),
         emptyJob))
@@ -110,7 +114,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertEquals(Optional.of(TASK_INFO), assigner.maybeAssign(OFFER, TASK, emptyJob));
+    assertEquals(Optional.of(TASK_INFO), assigner.maybeAssign(HOST_OFFER, TASK, emptyJob));
   }
 
   @Test
@@ -118,6 +122,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
     expect(filter.filter(
         ResourceSlot.from(OFFER),
         OFFER.getHostname(),
+        MaintenanceMode.NONE,
         TASK.getAssignedTask().getTask(),
         Tasks.id(TASK),
         emptyJob))
@@ -125,6 +130,6 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertEquals(Optional.<TaskInfo>absent(), assigner.maybeAssign(OFFER, TASK, emptyJob));
+    assertEquals(Optional.<TaskInfo>absent(), assigner.maybeAssign(HOST_OFFER, TASK, emptyJob));
   }
 }


Mime
View raw message