aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [2/2] aurora git commit: Refactor scheduling code to split matching and assigning phases
Date Tue, 09 Jan 2018 22:51:10 GMT
Refactor scheduling code to split matching and assigning phases

This patch sets the stage for performing the bulk of scheduling work in a
separate call path, without holding the write lock.

Also included is a mechanical refactor pushing the `revocable` flag into
`ResourceRequest` (which was ~always needed as a sibling parameter).

Reviewed at https://reviews.apache.org/r/64954/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4e6242fe
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4e6242fe
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4e6242fe

Branch: refs/heads/master
Commit: 4e6242fed68650e7be906ec3d17ae750f49f8cb8
Parents: 5b34231
Author: Bill Farner <wfarner@apache.org>
Authored: Tue Jan 9 14:50:51 2018 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Tue Jan 9 14:50:51 2018 -0800

----------------------------------------------------------------------
 .../common/testing/easymock/EasyMockTest.java   |  14 +-
 .../benchmark/fakes/FakeOfferManager.java       |  12 +-
 .../apache/aurora/scheduler/TierManager.java    |   4 +-
 .../aurora/scheduler/base/TaskTestUtil.java     |  14 +-
 .../executor/ExecutorSettings.java              |  21 +-
 .../scheduler/filter/SchedulingFilter.java      |  42 +++-
 .../scheduler/mesos/MesosTaskFactory.java       |   7 +-
 .../aurora/scheduler/offers/HostOffers.java     |  29 +--
 .../aurora/scheduler/offers/OfferManager.java   |  10 +-
 .../scheduler/offers/OfferManagerImpl.java      |  12 +-
 .../scheduler/preemptor/PreemptionVictim.java   |   5 +-
 .../preemptor/PreemptionVictimFilter.java       |  20 +-
 .../scheduler/resources/ResourceManager.java    |   6 +
 .../scheduler/scheduling/TaskAssigner.java      |   2 +-
 .../scheduler/scheduling/TaskAssignerImpl.java  | 237 +++++++++----------
 .../scheduler/scheduling/TaskScheduler.java     |   2 +-
 .../scheduler/scheduling/TaskSchedulerImpl.java | 100 ++++----
 .../aurora/scheduler/storage/TaskStore.java     |   3 +-
 .../storage/durability/WriteRecorder.java       |   3 +-
 .../scheduler/updater/UpdateAgentReserver.java  |  33 +--
 .../aurora/scheduler/TierManagerTest.java       |   7 +
 .../events/NotifyingSchedulingFilterTest.java   |   6 +-
 .../filter/SchedulingFilterImplTest.java        |  36 ++-
 .../mesos/MesosTaskFactoryImplTest.java         |  12 +-
 .../scheduler/offers/OfferManagerImplTest.java  |  51 ++--
 .../preemptor/PreemptionVictimFilterTest.java   | 101 ++------
 .../scheduling/FirstFitOfferSelectorTest.java   |   9 +-
 .../scheduling/TaskAssignerImplTest.java        | 170 ++++++-------
 .../scheduling/TaskSchedulerImplTest.java       |  56 ++---
 .../updater/NullAgentReserverTest.java          |   3 +-
 .../updater/UpdateAgentReserverImplTest.java    |  45 +---
 31 files changed, 476 insertions(+), 596 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
index c5500b3..15fc677 100644
--- a/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
+++ b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
@@ -26,6 +26,9 @@ import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IMocksControl;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
 
 import static org.easymock.EasyMock.createControl;
 
@@ -38,6 +41,16 @@ import static org.easymock.EasyMock.createControl;
 public abstract class EasyMockTest extends TearDownTestCase {
   protected IMocksControl control;
 
+  @Rule
+  public final TestWatcher verifyControl = new TestWatcher() {
+    @Override
+    protected void succeeded(Description description) {
+      // Only attempt to verify the control when the test case otherwise succeeded.  This prevents
+      // spurious mock-related error messages that distract from the real error.
+      control.verify();
+    }
+  };
+
   /**
    * Creates an EasyMock {@link #control} for tests to use that will be automatically
    * {@link IMocksControl#verify() verified} on tear down.
@@ -45,7 +58,6 @@ public abstract class EasyMockTest extends TearDownTestCase {
   @Before
   public final void setupEasyMock() {
     control = createControl();
-    addTearDown(() -> control.verify());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
index f0dacd4..0a105c7 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
@@ -15,6 +15,8 @@ package org.apache.aurora.benchmark.fakes;
 
 import java.util.Optional;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
@@ -59,18 +61,14 @@ public class FakeOfferManager implements OfferManager {
   }
 
   @Override
-  public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
-                                         ResourceRequest resourceRequest,
-                                         boolean revocable) {
-
+  public Optional<HostOffer> getMatching(Protos.AgentID slaveId, ResourceRequest resourceRequest) {
     return Optional.empty();
   }
 
   @Override
   public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
-                                            ResourceRequest resourceRequest,
-                                            boolean revocable) {
+                                            ResourceRequest resourceRequest) {
 
-    return null;
+    return ImmutableList.of();
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/TierManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TierManager.java b/src/main/java/org/apache/aurora/scheduler/TierManager.java
index c6ad2b1..a37fea4 100644
--- a/src/main/java/org/apache/aurora/scheduler/TierManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/TierManager.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler;
 
 import java.util.Map;
+import java.util.Optional;
 
 import javax.inject.Inject;
 
@@ -102,7 +103,8 @@ public interface TierManager {
           !taskConfig.isSetTier() || tierConfig.tiers.containsKey(taskConfig.getTier()),
           "Invalid tier '%s' in TaskConfig.", taskConfig.getTier());
 
-      return tierConfig.tiers.get(taskConfig.getTier());
+      return tierConfig.tiers.get(
+          Optional.ofNullable(taskConfig.getTier()).orElse(tierConfig.defaultTier));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index 22d5a64..2b61c27 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -47,6 +47,8 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -70,12 +72,14 @@ public final class TaskTestUtil {
       new TierInfo(true /* preemptible */, false /* revocable */);
   public static final TierInfo PREFERRED_TIER =
       new TierInfo(false /* preemptible */, false /* revocable */);
+  public static final String REVOCABLE_TIER_NAME = "tier-revocable";
   public static final String PROD_TIER_NAME = "tier-prod";
   public static final String DEV_TIER_NAME = "tier-dev";
   public static final TierConfig TIER_CONFIG =
       new TierConfig(DEV_TIER_NAME, ImmutableMap.of(
           PROD_TIER_NAME, PREFERRED_TIER,
-          DEV_TIER_NAME, DEV_TIER
+          DEV_TIER_NAME, DEV_TIER,
+          REVOCABLE_TIER_NAME, REVOCABLE_TIER
       ));
   public static final TierManager TIER_MANAGER = new TierManager.TierManagerImpl(TIER_CONFIG);
   public static final ThriftBackfill THRIFT_BACKFILL = new ThriftBackfill(TIER_MANAGER);
@@ -237,4 +241,12 @@ public final class TaskTestUtil {
         new org.apache.aurora.gen.TierConfig("revocable", REVOCABLE_TIER.toMap())
     );
   }
+
+  public static ResourceRequest toResourceRequest(ITaskConfig task) {
+    return ResourceRequest.fromTask(
+        task,
+        EXECUTOR_SETTINGS,
+        AttributeAggregate.empty(),
+        TIER_MANAGER);
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
index 5c987fd..dac84e2 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java
@@ -19,6 +19,9 @@ import java.util.Optional;
 
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
@@ -26,6 +29,9 @@ import static java.util.Objects.requireNonNull;
  * Configuration for the executor to run, and resource overhead required for it.
  */
 public class ExecutorSettings {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutorSettings.class);
+
   private final Map<String, ExecutorConfig> config;
   private final boolean populateDiscoveryInfo;
 
@@ -45,12 +51,19 @@ public class ExecutorSettings {
     return populateDiscoveryInfo;
   }
 
-  public Optional<ResourceBag> getExecutorOverhead(String name) {
+  public ResourceBag getExecutorOverhead(ITaskConfig task) {
+    if (!task.isSetExecutorConfig()) {
+      // Docker-based tasks don't need executors
+      return ResourceBag.EMPTY;
+    }
+
+    String name = task.getExecutorConfig().getName();
     if (config.containsKey(name)) {
-      return Optional.of(
-          ResourceManager.bagFromMesosResources(config.get(name).getExecutor().getResourcesList()));
+      return ResourceManager.bagFromMesosResources(
+          config.get(name).getExecutor().getResourcesList());
     } else {
-      return Optional.empty();
+      LOG.warn("No executor configuration found for " + name);
+      return ResourceBag.EMPTY;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index bd41590..fd97259 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -21,12 +21,17 @@ import java.util.Set;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.offers.HostOffer;
 import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.storage.entities.IConstraint;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
+import static java.util.Objects.requireNonNull;
+
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.CONSTRAINT_MISMATCH;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.INSUFFICIENT_RESOURCES;
 import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.LIMIT_NOT_SATISFIED;
@@ -306,11 +311,31 @@ public interface SchedulingFilter {
     private final ITaskConfig task;
     private final ResourceBag request;
     private final AttributeAggregate jobState;
+    private final boolean revocable;
+
+    private ResourceRequest(
+        ITaskConfig task,
+        ResourceBag request,
+        AttributeAggregate jobState,
+        boolean revocable) {
 
-    public ResourceRequest(ITaskConfig task, ResourceBag request, AttributeAggregate jobState) {
-      this.task = task;
-      this.request = request;
-      this.jobState = jobState;
+      this.task = requireNonNull(task);
+      this.request = requireNonNull(request);
+      this.jobState = requireNonNull(jobState);
+      this.revocable = revocable;
+    }
+
+    public static ResourceRequest fromTask(
+        ITaskConfig task,
+        ExecutorSettings executorSettings,
+        AttributeAggregate jobState,
+        TierManager tierManager) {
+
+      return new ResourceRequest(
+          task,
+          ResourceManager.bagFromTask(task, executorSettings),
+          jobState,
+          tierManager.getTier(task).isRevocable());
     }
 
     public Iterable<IConstraint> getConstraints() {
@@ -329,6 +354,10 @@ public interface SchedulingFilter {
       return jobState;
     }
 
+    public boolean isRevocable() {
+      return revocable;
+    }
+
     @Override
     public boolean equals(Object o) {
       if (!(o instanceof ResourceRequest)) {
@@ -338,12 +367,13 @@ public interface SchedulingFilter {
       ResourceRequest other = (ResourceRequest) o;
       return Objects.equals(task, other.task)
           && Objects.equals(request, other.request)
-          && Objects.equals(jobState, other.jobState);
+          && Objects.equals(jobState, other.jobState)
+          && revocable == other.revocable;
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(task, request, jobState);
+      return Objects.hash(task, request, jobState, revocable);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
index cb288bb..bcb2bbf 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -152,12 +152,7 @@ public interface MesosTaskFactory {
 
       ITaskConfig config = task.getTask();
 
-      // Docker-based tasks don't need executors
-      ResourceBag executorOverhead = ResourceBag.EMPTY;
-      if (config.isSetExecutorConfig()) {
-        executorOverhead =
-            executorSettings.getExecutorOverhead(getExecutorName(task)).orElse(ResourceBag.EMPTY);
-      }
+      ResourceBag executorOverhead = executorSettings.getExecutorOverhead(config);
 
       AcceptedOffer acceptedOffer;
       // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
index a01c0a8..2ea7a01 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
@@ -154,22 +154,13 @@ class HostOffers {
         .toSet();
   }
 
-  synchronized Optional<HostOffer> getMatching(Protos.AgentID slaveId,
-                                               ResourceRequest resourceRequest,
-                                               boolean revocable) {
+  synchronized Optional<HostOffer> getMatching(
+      Protos.AgentID slaveId,
+      ResourceRequest resourceRequest) {
 
-    Optional<HostOffer> optionalOffer = get(slaveId);
-    if (optionalOffer.isPresent()) {
-      HostOffer offer = optionalOffer.get();
-
-      if (isGloballyBanned(offer)
-          || isVetoed(offer, resourceRequest, revocable, Optional.empty())) {
-
-        return Optional.empty();
-      }
-    }
-
-    return optionalOffer;
+    return get(slaveId)
+        .filter(offer -> !isGloballyBanned(offer))
+        .filter(offer -> !isVetoed(offer, resourceRequest, Optional.empty()));
   }
 
   /**
@@ -182,14 +173,13 @@ class HostOffers {
    * @return The offers a given task group can use.
    */
   synchronized Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
-                                                  ResourceRequest resourceRequest,
-                                                  boolean revocable) {
+                                                  ResourceRequest resourceRequest) {
 
     return Iterables.unmodifiableIterable(FluentIterable.from(offers)
         .filter(o -> !isGloballyBanned(o))
         .filter(o -> !isStaticallyBanned(o, groupKey))
         .filter(HostOffer::hasCpuAndMem)
-        .filter(o -> !isVetoed(o, resourceRequest, revocable, Optional.of(groupKey))));
+        .filter(o -> !isVetoed(o, resourceRequest, Optional.of(groupKey))));
   }
 
   private synchronized boolean isGloballyBanned(HostOffer offer) {
@@ -207,11 +197,10 @@ class HostOffers {
    */
   private boolean isVetoed(HostOffer offer,
                            ResourceRequest resourceRequest,
-                           boolean revocable,
                            Optional<TaskGroupKey> groupKey) {
 
     vetoEvaluatedOffers.incrementAndGet();
-    UnusedResource unusedResource = new UnusedResource(offer, revocable);
+    UnusedResource unusedResource = new UnusedResource(offer, resourceRequest.isRevocable());
     Set<Veto> vetoes = schedulingFilter.filter(unusedResource, resourceRequest);
     if (!vetoes.isEmpty()) {
       if (groupKey.isPresent() && Veto.identifyGroup(vetoes) == SchedulingFilter.VetoGroup.STATIC) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index e90de3e..8f9e33d 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -81,12 +81,9 @@ public interface OfferManager extends EventSubscriber {
    *
    * @param slaveId Slave ID to get the offer for.
    * @param resourceRequest The request that the offer should satisfy.
-   * @param revocable Whether or not the request can use revocable resources.
    * @return An option containing the offer for the slave ID if it fits.
    */
-  Optional<HostOffer> getMatching(AgentID slaveId,
-                                  ResourceRequest resourceRequest,
-                                  boolean revocable);
+  Optional<HostOffer> getMatching(AgentID slaveId, ResourceRequest resourceRequest);
 
   /**
    * Gets all offers that the scheduler is holding that satisfy the supplied
@@ -94,12 +91,9 @@ public interface OfferManager extends EventSubscriber {
    *
    * @param groupKey The {@link TaskGroupKey} of the task in the {@link ResourceRequest}.
    * @param resourceRequest The request that the offer should satisfy.
-   * @param revocable Whether or not the request can use revocable resources.
    * @return An option containing the offer for the slave ID if it fits.
    */
-  Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
-                                     ResourceRequest resourceRequest,
-                                     boolean revocable);
+  Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey, ResourceRequest resourceRequest);
 
   /**
    * Launches the task matched against the offer.

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
index 8e806b7..084b48c 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
@@ -162,19 +162,15 @@ public class OfferManagerImpl implements OfferManager {
   }
 
   @Override
-  public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
-                                         ResourceRequest resourceRequest,
-                                         boolean revocable) {
-
-    return hostOffers.getMatching(slaveId, resourceRequest, revocable);
+  public Optional<HostOffer> getMatching(Protos.AgentID slaveId, ResourceRequest resourceRequest) {
+    return hostOffers.getMatching(slaveId, resourceRequest);
   }
 
   @Override
   public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
-                                            ResourceRequest resourceRequest,
-                                            boolean revocable) {
+                                            ResourceRequest resourceRequest) {
 
-    return hostOffers.getAllMatching(groupKey, resourceRequest, revocable);
+    return hostOffers.getAllMatching(groupKey, resourceRequest);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
index 69b6866..780689e 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java
@@ -17,6 +17,7 @@ import java.util.Objects;
 
 import com.google.common.base.MoreObjects;
 
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -54,8 +55,8 @@ public final class PreemptionVictim {
     return task.getTask().getPriority();
   }
 
-  public ResourceBag getResourceBag() {
-    return ResourceManager.bagFromResources(task.getTask().getResources());
+  public ResourceBag getResourceBag(ExecutorSettings executorSettings) {
+    return ResourceManager.bagFromTask(task.getTask(), executorSettings);
   }
 
   public String getTaskId() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
index cf6d348..569cfe6 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
@@ -38,7 +38,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.offers.HostOffer;
 import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -113,13 +112,7 @@ public interface PreemptionVictimFilter {
         new Function<PreemptionVictim, ResourceBag>() {
           @Override
           public ResourceBag apply(PreemptionVictim victim) {
-            ResourceBag bag = victim.getResourceBag();
-
-            if (victim.getConfig().isSetExecutorConfig()) {
-              // Be pessimistic about revocable resource available if config is not available
-              bag.add(executorSettings.getExecutorOverhead(
-                  victim.getConfig().getExecutorConfig().getName()).orElse(EMPTY));
-            }
+            ResourceBag bag = victim.getResourceBag(executorSettings);
 
             if (tierManager.getTier(victim.getConfig()).isRevocable()) {
               // Revocable task CPU cannot be used for preemption purposes as it's a compressible
@@ -223,10 +216,8 @@ public interface PreemptionVictimFilter {
         return Optional.empty();
       }
 
-      ResourceBag overhead = pendingTask.isSetExecutorConfig()
-          ? executorSettings.getExecutorOverhead(
-              pendingTask.getExecutorConfig().getName()).orElse(EMPTY)
-          : EMPTY;
+      ResourceRequest requiredResources =
+          ResourceRequest.fromTask(pendingTask, executorSettings, jobState, tierManager);
 
       ResourceBag totalResource = slackResources;
       for (PreemptionVictim victim : sortedVictims) {
@@ -240,10 +231,7 @@ public interface PreemptionVictimFilter {
 
         Set<Veto> vetoes = schedulingFilter.filter(
             new UnusedResource(totalResource, attributes.get(), unavailability),
-            new ResourceRequest(
-                pendingTask,
-                ResourceManager.bagFromResources(pendingTask.getResources()).add(overhead),
-                jobState));
+            requiredResources);
 
         if (vetoes.isEmpty()) {
           return Optional.of(ImmutableSet.copyOf(toPreemptTasks));

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
index d093753..2bf9808 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
@@ -26,6 +26,7 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 
 import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IResource;
@@ -245,6 +246,11 @@ public final class ResourceManager {
     return bagFromResources(resources, RESOURCE_TO_TYPE, QUANTIFY_RESOURCE);
   }
 
+  public static ResourceBag bagFromTask(ITaskConfig task, ExecutorSettings executorSettings) {
+    return bagFromResources(task.getResources(), RESOURCE_TO_TYPE, QUANTIFY_RESOURCE)
+        .add(executorSettings.getExecutorOverhead(task));
+  }
+
   /**
    * Creates a {@link ResourceBag} from Mesos resources.
    *

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
index 87619b5..d2597a1 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
@@ -41,6 +41,6 @@ public interface TaskAssigner {
       MutableStoreProvider storeProvider,
       ResourceRequest resourceRequest,
       TaskGroupKey groupKey,
-      Iterable<IAssignedTask> tasks,
+      Set<IAssignedTask> tasks,
       Map<String, TaskGroupKey> preemptionReservations);
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
index 54bd177..ec416cc 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.scheduling;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -21,22 +22,22 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 
 import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.base.InstanceKeys;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.offers.HostOffer;
 import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.offers.OfferManager.LaunchException;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
@@ -64,7 +65,6 @@ public class TaskAssignerImpl implements TaskAssigner {
   private final StateManager stateManager;
   private final MesosTaskFactory taskFactory;
   private final OfferManager offerManager;
-  private final TierManager tierManager;
   private final UpdateAgentReserver updateAgentReserver;
   private final OfferSelector offerSelector;
 
@@ -73,7 +73,6 @@ public class TaskAssignerImpl implements TaskAssigner {
       StateManager stateManager,
       MesosTaskFactory taskFactory,
       OfferManager offerManager,
-      TierManager tierManager,
       UpdateAgentReserver updateAgentReserver,
       StatsProvider statsProvider,
       OfferSelector offerSelector) {
@@ -81,7 +80,6 @@ public class TaskAssignerImpl implements TaskAssigner {
     this.stateManager = requireNonNull(stateManager);
     this.taskFactory = requireNonNull(taskFactory);
     this.offerManager = requireNonNull(offerManager);
-    this.tierManager = requireNonNull(tierManager);
     this.launchFailures = statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES);
     this.updateAgentReserver = requireNonNull(updateAgentReserver);
     this.offerSelector = requireNonNull(offerSelector);
@@ -99,7 +97,7 @@ public class TaskAssignerImpl implements TaskAssigner {
   }
 
   private Protos.TaskInfo assign(
-      Storage.MutableStoreProvider storeProvider,
+      MutableStoreProvider storeProvider,
       Protos.Offer offer,
       String taskId,
       boolean revocable) {
@@ -118,20 +116,18 @@ public class TaskAssignerImpl implements TaskAssigner {
   }
 
   private void launchUsingOffer(
-      Storage.MutableStoreProvider storeProvider,
-      boolean revocable,
+      MutableStoreProvider stores,
       ResourceRequest resourceRequest,
       IAssignedTask task,
-      HostOffer offer,
-      ImmutableSet.Builder<String> assignmentResult) throws OfferManager.LaunchException {
+      HostOffer offer) throws LaunchException {
 
     String taskId = task.getTaskId();
-    Protos.TaskInfo taskInfo = assign(storeProvider, offer.getOffer(), taskId, revocable);
+    Protos.TaskInfo taskInfo =
+        assign(stores, offer.getOffer(), taskId, resourceRequest.isRevocable());
     resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
     try {
       offerManager.launchTask(offer.getOffer().getId(), taskInfo);
-      assignmentResult.add(taskId);
-    } catch (OfferManager.LaunchException e) {
+    } catch (LaunchException e) {
       LOG.warn("Failed to launch task.", e);
       launchFailures.incrementAndGet();
 
@@ -139,147 +135,144 @@ public class TaskAssignerImpl implements TaskAssigner {
       // It is in the LOST state and a new task will move to PENDING to replace it.
       // Should the state change fail due to storage issues, that's okay.  The task will
       // time out in the ASSIGNED state and be moved to LOST.
-      stateManager.changeState(
-          storeProvider,
-          taskId,
-          Optional.of(PENDING),
-          LOST,
-          LAUNCH_FAILED_MSG);
+      stateManager.changeState(stores, taskId, Optional.of(PENDING), LOST, LAUNCH_FAILED_MSG);
       throw e;
     }
   }
 
-  private Iterable<IAssignedTask> maybeAssignReserved(
-      Iterable<IAssignedTask> tasks,
-      Storage.MutableStoreProvider storeProvider,
-      boolean revocable,
-      ResourceRequest resourceRequest,
-      TaskGroupKey groupKey,
-      ImmutableSet.Builder<String> assignmentResult) {
+  private static final class ReservationStatus {
+    final boolean taskReserving;
+    final Optional<HostOffer> offer;
 
-    if (!updateAgentReserver.hasReservations(groupKey)) {
-      return tasks;
+    private ReservationStatus(boolean taskReserving, Optional<HostOffer> offer) {
+      this.taskReserving = taskReserving;
+      this.offer = requireNonNull(offer);
     }
 
-    // Data structure to record which tasks should be excluded from the regular (non-reserved)
-    // scheduling loop. This is important because we release reservations once they are used,
-    // so we need to record them separately to avoid them being double-scheduled.
-    ImmutableSet.Builder<IInstanceKey> excludeBuilder = ImmutableSet.builder();
-
-    for (IAssignedTask task : tasks) {
-      IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId());
-      Optional<String> maybeAgentId = updateAgentReserver.getAgent(key);
-      if (maybeAgentId.isPresent()) {
-        excludeBuilder.add(key);
-        Optional<HostOffer> offer = offerManager.getMatching(
-            Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build(),
-            resourceRequest,
-            revocable);
-        if (offer.isPresent()) {
-          try {
-            // The offer can still be veto'd because of changed constraints, or because the
-            // Scheduler hasn't been updated by Mesos yet...
-            launchUsingOffer(storeProvider,
-                revocable,
-                resourceRequest,
-                task,
-                offer.get(),
-                assignmentResult);
-            LOG.info("Used update reservation for {} on {}", key, maybeAgentId.get());
-            updateAgentReserver.release(maybeAgentId.get(), key);
-          } catch (OfferManager.LaunchException e) {
-            updateAgentReserver.release(maybeAgentId.get(), key);
-          }
-        } else {
-          LOG.info(
-              "Tried to reuse offer on {} for {}, but was not ready yet.",
-              maybeAgentId.get(),
-              key);
-        }
-      }
+    static final ReservationStatus NOT_RESERVING = new ReservationStatus(false, Optional.empty());
+    static final ReservationStatus NOT_READY = new ReservationStatus(true, Optional.empty());
+
+    static ReservationStatus ready(HostOffer offer) {
+      return new ReservationStatus(true, Optional.of(offer));
     }
 
-    // Return only the tasks that didn't have reservations. Offers on agents that were reserved
-    // might not have been seen by Aurora yet, so we need to wait until the reservation expires
-    // before giving up and falling back to the first-fit algorithm.
-    Set<IInstanceKey> toBeExcluded = excludeBuilder.build();
-    return Iterables.filter(tasks, t -> !toBeExcluded.contains(
-        InstanceKeys.from(t.getTask().getJob(), t.getInstanceId())));
+    boolean isTaskReserving() {
+      return taskReserving;
+    }
+
+    Optional<HostOffer> getOffer() {
+      return offer;
+    }
+  }
+
+  private ReservationStatus getReservation(IAssignedTask task, ResourceRequest resourceRequest) {
+
+    IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId());
+    Optional<String> agentId = updateAgentReserver.getAgent(key);
+    if (!agentId.isPresent()) {
+      return ReservationStatus.NOT_RESERVING;
+    }
+    Optional<HostOffer> offer = offerManager.getMatching(
+        Protos.AgentID.newBuilder().setValue(agentId.get()).build(),
+        resourceRequest);
+    if (offer.isPresent()) {
+      LOG.info("Used update reservation for {} on {}", key, agentId.get());
+      updateAgentReserver.release(agentId.get(), key);
+      return ReservationStatus.ready(offer.get());
+    } else {
+      LOG.info(
+          "Tried to reuse offer on {} for {}, but was not ready yet.",
+          agentId.get(),
+          key);
+      return ReservationStatus.NOT_READY;
+    }
   }
 
   /**
    * Determine whether or not the offer is reserved for a different task via preemption or
    * update affinity.
    */
-  @SuppressWarnings("PMD.UselessParentheses")  // TODO(jly): PMD bug, remove when upgrade from 5.5.3
-  private boolean isAgentReserved(HostOffer offer,
-                                  TaskGroupKey groupKey,
-                                  Map<String, TaskGroupKey> preemptionReservations) {
+  private boolean isAgentReserved(
+      HostOffer offer,
+      TaskGroupKey groupKey,
+      Map<String, TaskGroupKey> preemptionReservations) {
 
     String agentId = offer.getOffer().getAgentId().getValue();
-    Optional<TaskGroupKey> reservedGroup = Optional.ofNullable(
-        preemptionReservations.get(agentId));
+    boolean reservedForPreemption = Optional.ofNullable(preemptionReservations.get(agentId))
+        .map(group -> !group.equals(groupKey))
+        .orElse(false);
 
-    return (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey))
-        || !updateAgentReserver.getReservations(agentId).isEmpty();
+    return reservedForPreemption || updateAgentReserver.isReserved(agentId);
   }
 
-  @Timed("assigner_maybe_assign")
-  @Override
-  public Set<String> maybeAssign(
-      Storage.MutableStoreProvider storeProvider,
-      ResourceRequest resourceRequest,
-      TaskGroupKey groupKey,
-      Iterable<IAssignedTask> tasks,
-      Map<String, TaskGroupKey> preemptionReservations) {
+  private static class SchedulingMatch {
+    final IAssignedTask task;
+    final HostOffer offer;
 
-    if (Iterables.isEmpty(tasks)) {
-      return ImmutableSet.of();
+    SchedulingMatch(IAssignedTask task, HostOffer offer) {
+      this.task = requireNonNull(task);
+      this.offer = requireNonNull(offer);
     }
+  }
 
-    boolean revocable = tierManager.getTier(groupKey.getTask()).isRevocable();
-    ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder();
+  private Collection<SchedulingMatch> findMatches(
+      ResourceRequest resourceRequest,
+      TaskGroupKey groupKey,
+      Set<IAssignedTask> tasks,
+      Map<String, TaskGroupKey> preemptionReservations) {
 
-    // Assign tasks reserved for a specific agent (e.g. for update affinity)
-    Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved(
-        tasks,
-        storeProvider,
-        revocable,
-        resourceRequest,
-        groupKey,
-        assignmentResult);
+    // Avoid matching multiple tasks against any offer.
+    Map<String, SchedulingMatch> matchesByOffer = Maps.newHashMap();
 
-    // Assign the rest of the non-reserved tasks
-    for (IAssignedTask task : nonReservedTasks) {
-      try {
+    tasks.forEach(task -> {
+      ReservationStatus reservation = getReservation(task, resourceRequest);
+      Optional<HostOffer> chosenOffer;
+      if (reservation.isTaskReserving()) {
+        // Use the reserved offer, which may not currently exist.
+        chosenOffer = reservation.getOffer();
+      } else {
         // Get all offers that will satisfy the given ResourceRequest and that are not reserved
         // for updates or preemption
-        FluentIterable<HostOffer> matchingOffers = FluentIterable
-            .from(offerManager.getAllMatching(groupKey, resourceRequest, revocable))
-            .filter(o -> !isAgentReserved(o, groupKey, preemptionReservations));
+        Iterable<HostOffer> matchingOffers = Iterables.filter(
+            offerManager.getAllMatching(groupKey, resourceRequest),
+            o -> !matchesByOffer.containsKey(o.getOffer().getId().getValue())
+                && !isAgentReserved(o, groupKey, preemptionReservations));
 
         // Determine which is the optimal offer to select for the given request
-        Optional<HostOffer> optionalOffer = offerSelector.select(matchingOffers, resourceRequest);
-
-        // If no offer is chosen, continue to the next task
-        if (!optionalOffer.isPresent()) {
-          continue;
-        }
-
-        // Attempt to launch the task using the chosen offer
-        HostOffer offer = optionalOffer.get();
-        launchUsingOffer(storeProvider,
-            revocable,
-            resourceRequest,
-            task,
-            offer,
-            assignmentResult);
-      } catch (OfferManager.LaunchException e) {
+        chosenOffer = offerSelector.select(matchingOffers, resourceRequest);
+      }
+
+      chosenOffer.ifPresent(hostOffer -> {
+        matchesByOffer.put(
+            hostOffer.getOffer().getId().getValue(),
+            new SchedulingMatch(task, hostOffer));
+      });
+    });
+
+    return matchesByOffer.values();
+  }
+
+  @Timed("assigner_maybe_assign")
+  @Override
+  public Set<String> maybeAssign(
+      MutableStoreProvider storeProvider,
+      ResourceRequest resourceRequest,
+      TaskGroupKey groupKey,
+      Set<IAssignedTask> tasks,
+      Map<String, TaskGroupKey> reservations) {
+
+    ImmutableSet.Builder<String> assigned = ImmutableSet.builder();
+
+    for (SchedulingMatch match : findMatches(resourceRequest, groupKey, tasks, reservations)) {
+      try {
+        launchUsingOffer(storeProvider, resourceRequest, match.task, match.offer);
+        assigned.add(match.task.getTaskId());
+      } catch (LaunchException e) {
         // Any launch exception causes the scheduling round to terminate for this TaskGroup.
         break;
       }
     }
 
-    return assignmentResult.build();
+    return assigned.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index 3c38f95..d2f3257 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -31,5 +31,5 @@ public interface TaskScheduler extends EventSubscriber {
    * @return Successfully scheduled task IDs. The caller should call schedule again if a given
    *         task ID was not present in the result.
    */
-  Set<String> schedule(MutableStoreProvider storeProvider, Iterable<String> taskIds);
+  Set<String> schedule(MutableStoreProvider storeProvider, Set<String> taskIds);
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
index cff4ab1..edab03d 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
@@ -19,6 +19,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import javax.inject.Inject;
@@ -26,7 +27,6 @@ import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -34,16 +34,17 @@ import com.google.common.eventbus.Subscribe;
 
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.scheduler.TierManager;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -55,10 +56,8 @@ import static java.lang.annotation.ElementType.METHOD;
 import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.toMap;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
 
 /**
  * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where each task
@@ -81,6 +80,7 @@ public class TaskSchedulerImpl implements TaskScheduler {
   private final TaskAssigner assigner;
   private final Preemptor preemptor;
   private final ExecutorSettings executorSettings;
+  private final TierManager tierManager;
   private final BiCache<String, TaskGroupKey> reservations;
 
   private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
@@ -92,16 +92,18 @@ public class TaskSchedulerImpl implements TaskScheduler {
       TaskAssigner assigner,
       Preemptor preemptor,
       ExecutorSettings executorSettings,
+      TierManager tierManager,
       BiCache<String, TaskGroupKey> reservations) {
 
     this.assigner = requireNonNull(assigner);
     this.preemptor = requireNonNull(preemptor);
     this.executorSettings = requireNonNull(executorSettings);
+    this.tierManager = requireNonNull(tierManager);
     this.reservations = requireNonNull(reservations);
   }
 
   @Timed("task_schedule_attempt")
-  public Set<String> schedule(Storage.MutableStoreProvider store, Iterable<String> taskIds) {
+  public Set<String> schedule(MutableStoreProvider store, Set<String> taskIds) {
     try {
       return scheduleTasks(store, taskIds);
     } catch (RuntimeException e) {
@@ -115,77 +117,67 @@ public class TaskSchedulerImpl implements TaskScheduler {
     }
   }
 
-  private Set<String> scheduleTasks(Storage.MutableStoreProvider store, Iterable<String> tasks) {
-    ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks);
-    String taskIdValues = Joiner.on(",").join(taskIds);
-    LOG.debug("Attempting to schedule tasks {}", taskIdValues);
-    ImmutableSet<IAssignedTask> assignedTasks =
-        ImmutableSet.copyOf(Iterables.transform(
-            store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)),
-            IScheduledTask::getAssignedTask));
-
-    if (Iterables.isEmpty(assignedTasks)) {
-      LOG.warn("Failed to look up all tasks in a scheduling round: {}", taskIdValues);
-      return taskIds;
+  private Map<String, IAssignedTask> fetchTasks(StoreProvider store, Set<String> ids) {
+    Map<String, IAssignedTask> tasks = store.getTaskStore()
+        .fetchTasks(Query.taskScoped(ids).byStatus(PENDING))
+        .stream()
+        .map(IScheduledTask::getAssignedTask)
+        .collect(Collectors.toMap(
+            IAssignedTask::getTaskId,
+            Function.identity()
+        ));
+
+    if (ids.size() != tasks.size()) {
+      LOG.warn("Failed to look up tasks "
+          + Joiner.on(", ").join(Sets.difference(ids, tasks.keySet())));
     }
+    return tasks;
+  }
 
-    Preconditions.checkState(
-        assignedTasks.stream()
-            .collect(Collectors.groupingBy(t -> t.getTask()))
-            .entrySet()
-            .size() == 1,
-        "Found multiple task groups for %s",
-        taskIdValues);
-
-    Map<String, IAssignedTask> assignableTaskMap =
-        assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t));
+  private Set<String> scheduleTasks(MutableStoreProvider store, Set<String> ids) {
+    LOG.debug("Attempting to schedule tasks {}", ids);
+    Map<String, IAssignedTask> tasksById = fetchTasks(store, ids);
 
-    if (taskIds.size() != assignedTasks.size()) {
-      LOG.warn("Failed to look up tasks "
-          + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet())));
+    if (tasksById.isEmpty()) {
+      // None of the tasks were found in storage.  This could be caused by a task group that was
+      // killed by the user, for example.
+      return ids;
     }
 
-    // This is safe after all checks above.
-    ITaskConfig task = assignedTasks.stream().findFirst().get().getTask();
+    // Prepare scheduling context for the tasks
+    ITaskConfig task = Iterables.getOnlyElement(tasksById.values().stream()
+        .map(IAssignedTask::getTask)
+        .collect(Collectors.toSet()));
     AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
 
-    // Valid Docker tasks can have a container but no executor config
-    ResourceBag overhead = ResourceBag.EMPTY;
-    if (task.isSetExecutorConfig()) {
-      overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
-          .orElseThrow(
-              () -> new IllegalArgumentException("Cannot find executor configuration"));
-    }
-
+    // Attempt to schedule using available resources.
     Set<String> launched = assigner.maybeAssign(
         store,
-        new SchedulingFilter.ResourceRequest(
-            task,
-            bagFromResources(task.getResources()).add(overhead), aggregate),
+        ResourceRequest.fromTask(task, executorSettings, aggregate, tierManager),
         TaskGroupKey.from(task),
-        assignedTasks,
+        ImmutableSet.copyOf(tasksById.values()),
         reservations.asMap());
 
-    attemptsFired.addAndGet(assignableTaskMap.size());
-    Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched);
+    attemptsFired.addAndGet(tasksById.size());
 
-    failedToLaunch.forEach(taskId -> {
-      // Task could not be scheduled.
+    // Fall back to preemption for tasks not scheduled above.
+    Set<String> unassigned = Sets.difference(tasksById.keySet(), launched);
+    unassigned.forEach(taskId -> {
       // TODO(maxim): Now that preemption slots are searched asynchronously, consider
       // retrying a launch attempt within the current scheduling round IFF a reservation is
       // available.
-      maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store);
+      maybePreemptFor(tasksById.get(taskId), aggregate, store);
     });
-    attemptsNoMatch.addAndGet(failedToLaunch.size());
+    attemptsNoMatch.addAndGet(unassigned.size());
 
     // Return all successfully launched tasks as well as those weren't tried (not in PENDING).
-    return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet()));
+    return Sets.union(launched, Sets.difference(ids, tasksById.keySet()));
   }
 
   private void maybePreemptFor(
       IAssignedTask task,
       AttributeAggregate jobState,
-      Storage.MutableStoreProvider storeProvider) {
+      MutableStoreProvider storeProvider) {
 
     if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
       return;

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 1219215..ebb345d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.storage;
 
+import java.util.Collection;
 import java.util.Optional;
 import java.util.Set;
 
@@ -47,7 +48,7 @@ public interface TaskStore {
    * @param query Builder of the query to identify tasks with.
    * @return A read-only view of matching tasks.
    */
-  Iterable<IScheduledTask> fetchTasks(Query.Builder query);
+  Collection<IScheduledTask> fetchTasks(Query.Builder query);
 
   /**
    * Fetches all job keys represented in the task store.

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
index dea8e69..8d70cae 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.storage.durability;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -327,7 +328,7 @@ public class WriteRecorder implements
   }
 
   @Override
-  public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
+  public Collection<IScheduledTask> fetchTasks(Query.Builder query) {
     return this.taskStore.fetchTasks(query);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
index b6909a6..59eca7b 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
@@ -14,12 +14,9 @@
 package org.apache.aurora.scheduler.updater;
 
 import java.util.Optional;
-import java.util.Set;
 
-import com.google.common.collect.ImmutableSet;
 import com.google.inject.Inject;
 
-import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.slf4j.Logger;
@@ -58,21 +55,13 @@ public interface UpdateAgentReserver {
   Optional<String> getAgent(IInstanceKey key);
 
   /**
-   * Get all reservations for a given agent id. Useful for skipping over the agent between the
+   * Checks whether an agent is currently reserved. Useful for skipping over the agent between the
    * reserve/release window.
    *
    * @param agentId The agent id to look up reservations for.
    * @return A set of keys reserved for that agent.
    */
-  Set<IInstanceKey> getReservations(String agentId);
-
-  /**
-   * Check if the agent reserver has any reservations for the provided key.
-   *
-   * @param groupKey The key to check.
-   * @return True if there are reservations against any instances in that key.
-   */
-  boolean hasReservations(TaskGroupKey groupKey);
+  boolean isReserved(String agentId);
 
   /**
    * Implementation of the update reserver backed by a BiCache (the same mechanism we use for
@@ -99,16 +88,9 @@ public interface UpdateAgentReserver {
       cache.remove(key, agentId);
     }
 
-    public Set<IInstanceKey> getReservations(String agentId) {
-      return cache.getByValue(agentId);
-    }
-
     @Override
-    public boolean hasReservations(TaskGroupKey groupKey) {
-      return cache.asMap().entrySet().stream()
-          .filter(entry -> entry.getKey().getJobKey().equals(groupKey.getTask().getJob()))
-          .findFirst()
-          .isPresent();
+    public boolean isReserved(String agentId) {
+      return !cache.getByValue(agentId).isEmpty();
     }
 
     @Override
@@ -137,12 +119,7 @@ public interface UpdateAgentReserver {
     }
 
     @Override
-    public Set<IInstanceKey> getReservations(String agentId) {
-      return ImmutableSet.of();
-    }
-
-    @Override
-    public boolean hasReservations(TaskGroupKey groupKey) {
+    public boolean isReserved(String agentId) {
       return false;
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
index 82e40d5..f116e3a 100644
--- a/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java
@@ -48,6 +48,13 @@ public class TierManagerTest {
   }
 
   @Test
+  public void testDefaultTier() {
+    assertEquals(
+        DEV_TIER,
+        TIER_MANAGER.getTier(ITaskConfig.build(new TaskConfig())));
+  }
+
+  @Test
   public void testGetTierRevocableAndProduction() {
     assertEquals(
         REVOCABLE_TIER,

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index 64d7a44..7136711 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -23,13 +23,12 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.TaskVars;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.metadata.NearestFit;
-import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -53,8 +52,7 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
   private static final UnusedResource RESOURCE = new UnusedResource(
       ResourceManager.bagFromResources(TASK.getResources()),
       IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE)));
-  private static final ResourceRequest REQUEST =
-      new ResourceRequest(TASK, ResourceBag.EMPTY, AttributeAggregate.empty());
+  private static final ResourceRequest REQUEST = TaskTestUtil.toResourceRequest(TASK);
 
   private static final Veto VETO_1 = Veto.insufficientResources("ram", 1);
   private static final Veto VETO_2 = Veto.insufficientResources("ram", 2);

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index 0de90d7..21d4e47 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -38,14 +38,13 @@ import org.apache.aurora.gen.TaskConstraint;
 import org.apache.aurora.gen.ValueConstraint;
 import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType;
-import org.apache.aurora.scheduler.mesos.TaskExecutors;
 import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IAttribute;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -57,8 +56,10 @@ import org.junit.Test;
 import static org.apache.aurora.gen.Resource.diskMb;
 import static org.apache.aurora.gen.Resource.numCpus;
 import static org.apache.aurora.gen.Resource.ramMb;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.TIER_MANAGER;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static org.apache.aurora.scheduler.mesos.TaskExecutors.NO_OVERHEAD_EXECUTOR;
 import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
@@ -135,22 +136,22 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         none,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(noPortTask, bag(noPortTask), empty())));
+            TaskTestUtil.toResourceRequest(noPortTask)));
     assertEquals(
         none,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(onePortTask, bag(onePortTask), empty())));
+            TaskTestUtil.toResourceRequest(onePortTask)));
     assertEquals(
         none,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(twoPortTask, bag(twoPortTask), empty())));
+            TaskTestUtil.toResourceRequest(twoPortTask)));
     assertEquals(
         ImmutableSet.of(veto(PORTS, 1)),
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(threePortTask, bag(threePortTask), empty())));
+            TaskTestUtil.toResourceRequest(threePortTask)));
   }
 
   @Test
@@ -238,13 +239,12 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         DEFAULT_OFFER,
         hostAttributes(HOST_A),
         Optional.of(start));
-    ResourceRequest request = new ResourceRequest(task, bag(task), empty());
 
     control.replay();
 
     assertEquals(
         ImmutableSet.of(Veto.maintenance("draining")),
-        defaultFilter.filter(unusedResource, request));
+        defaultFilter.filter(unusedResource, TaskTestUtil.toResourceRequest(task)));
   }
 
   @Test
@@ -262,14 +262,12 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         DEFAULT_OFFER,
         hostAttributes(HOST_A),
         Optional.of(start));
-    ResourceRequest request = new ResourceRequest(task, bag(task), empty());
 
     control.replay();
 
     assertEquals(
         ImmutableSet.of(),
-        defaultFilter.filter(unusedResource, request));
-
+        defaultFilter.filter(unusedResource, TaskTestUtil.toResourceRequest(task)));
   }
 
   @Test
@@ -350,7 +348,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testLimitWithinJob() throws Exception {
+  public void testLimitWithinJob() {
     control.replay();
 
     AttributeAggregate stateA = AttributeAggregate.create(
@@ -465,7 +463,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         ImmutableSet.of(),
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostA),
-            new ResourceRequest(task, bag(task), empty())));
+            TaskTestUtil.toResourceRequest(task)));
 
     Constraint jvmNegated = jvmConstraint.deepCopy();
     jvmNegated.getConstraint().getValue().setNegated(true);
@@ -577,7 +575,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         expected,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostAttributes),
-            new ResourceRequest(task, bag(task), aggregate))
+            TaskTestUtil.toResourceRequest(task))
             .isEmpty());
 
     Constraint negated = constraint.deepCopy();
@@ -587,7 +585,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         !expected,
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostAttributes),
-            new ResourceRequest(negatedTask, bag(negatedTask), aggregate))
+            ResourceRequest.fromTask(negatedTask, NO_OVERHEAD_EXECUTOR, aggregate, TIER_MANAGER))
             .isEmpty());
     return task;
   }
@@ -618,7 +616,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
         ImmutableSet.copyOf(vetoes),
         defaultFilter.filter(
             new UnusedResource(DEFAULT_OFFER, hostAttributes),
-            new ResourceRequest(task, bag(task), jobState)));
+            ResourceRequest.fromTask(task, NO_OVERHEAD_EXECUTOR, jobState, TIER_MANAGER)));
   }
 
   private static IHostAttributes hostAttributes(
@@ -686,10 +684,4 @@ public class SchedulingFilterImplTest extends EasyMockTest {
   private ITaskConfig makeTask() {
     return makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK);
   }
-
-  private ResourceBag bag(ITaskConfig task) {
-    return ResourceManager.bagFromResources(task.getResources())
-        .add(TaskExecutors.NO_OVERHEAD_EXECUTOR.getExecutorOverhead(
-            task.getExecutorConfig().getName()).get());
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
index c27a662..686087e 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java
@@ -38,6 +38,7 @@ import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl;
 import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -124,15 +125,13 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
       .setAgentId(SLAVE)
       .setHostname("slave-hostname")
       .addAllResources(mesosScalarFromBag(bagFromResources(
-              TASK_CONFIG.getResources()).add(THERMOS_EXECUTOR.getExecutorOverhead(
-                  TASK_CONFIG.getExecutorConfig().getName()).get())))
+              TASK_CONFIG.getResources()).add(THERMOS_EXECUTOR.getExecutorOverhead(TASK_CONFIG))))
       .addResources(mesosRange(PORTS, 80))
       .build();
   private static final Offer OFFER_SOME_OVERHEAD_EXECUTOR = OFFER_THERMOS_EXECUTOR.toBuilder()
       .clearResources()
       .addAllResources(mesosScalarFromBag(bagFromResources(
-          TASK_CONFIG.getResources()).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead(
-              TASK_CONFIG.getExecutorConfig().getName()).get())))
+          TASK_CONFIG.getResources()).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead(TASK_CONFIG))))
       .addResources(mesosRange(PORTS, 80))
       .build();
 
@@ -282,10 +281,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest {
     ResourceBag executorResources =
         bagFromMesosResources(taskInfo.getExecutor().getResourcesList());
 
-    assertEquals(
-        bagFromResources(task.getResources()).add(
-            config.getExecutorOverhead(task.getExecutorConfig().getName()).get()),
-        taskResources.add(executorResources));
+    assertEquals(ResourceManager.bagFromTask(task, config), taskResources.add(executorResources));
   }
 
   private void checkDiscoveryInfoUnset(TaskInfo taskInfo) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 1e9532b..28224a5 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -29,6 +29,7 @@ import org.apache.aurora.common.util.testing.FakeTicker;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
@@ -37,7 +38,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.offers.Deferment.Noop;
-import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -57,7 +57,6 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
 import static org.apache.aurora.scheduler.offers.OfferManagerImpl.GLOBALLY_BANNED_OFFERS;
 import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_ACCEPT_RACES;
 import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_CANCEL_FAILURES;
@@ -100,10 +99,8 @@ public class OfferManagerImplTest extends EasyMockTest {
   private static final int PORT = 1000;
   private static final Protos.Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT));
   private static final IScheduledTask TASK = makeTask("id", JOB);
-  private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest(
-      TASK.getAssignedTask().getTask(),
-      ResourceBag.EMPTY,
-      empty());
+  private static final ResourceRequest EMPTY_REQUEST =
+      TaskTestUtil.toResourceRequest(TASK.getAssignedTask().getTask());
   private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
   private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
       .setName("taskName")
@@ -250,14 +247,14 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     offerManager.add(OFFER_A);
     assertEquals(OFFER_A,
-        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
 
     // Add static ban.
     offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
-    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
   }
 
   @Test
@@ -269,14 +266,14 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_A);
     offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
-    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
 
     // Make sure the static ban expires after maximum amount of time an offer is held.
     FAKE_TICKER.advance(RETURN_DELAY);
     offerManager.cleanupStaticBans();
     assertEquals(OFFER_A,
-        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
   }
 
@@ -289,7 +286,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_A);
     offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
-    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
 
     // Make sure the static ban is cleared when driver is disconnected.
@@ -297,7 +294,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     offerManager.add(OFFER_A);
     assertEquals(OFFER_A,
-        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
   }
 
   @Test
@@ -361,14 +358,14 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
-    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
 
     offerManager.cancel(OFFER_A_ID);
     offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
     assertEquals(OFFER_A,
-        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
   }
 
   private static HostOffer setUnavailability(HostOffer offer, long startMs) {
@@ -423,7 +420,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -460,7 +457,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, true)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -488,7 +485,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -516,7 +513,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -555,7 +552,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getAll()));
   }
@@ -628,7 +625,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     control.replay();
     offerManager.add(OFFER_A);
     assertEquals(Optional.of(OFFER_A),
-        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST));
   }
 
   @Test
@@ -640,7 +637,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
     offerManager.ban(OFFER_A_ID);
     assertEquals(Optional.empty(),
-        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST));
     assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
   }
 
@@ -655,7 +652,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_A);
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(Optional.empty(),
-        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST));
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
   }
 
@@ -669,7 +666,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.add(OFFER_C);
     assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(ImmutableSet.of(OFFER_A, OFFER_B, OFFER_C),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
   }
 
@@ -685,7 +682,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
     offerManager.ban(OFFER_B.getOffer().getId());
     assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
   }
@@ -702,7 +699,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     offerManager.banForTaskGroup(OFFER_B.getOffer().getId(), GROUP_KEY);
     assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(ImmutableSet.of(Pair.of(OFFER_B.getOffer().getId(), GROUP_KEY)),
@@ -727,7 +724,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(ImmutableSet.of(OFFER_A),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(1, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(ImmutableSet.of(empty, OFFER_A),
         ImmutableSet.copyOf(offerManager.getAll()));
@@ -750,7 +747,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(ImmutableSet.of(OFFER_B),
-        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST)));
     assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(ImmutableSet.of(Pair.of(OFFER_A.getOffer().getId(), GROUP_KEY)),


Mime
View raw message