aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dmclaugh...@apache.org
Subject aurora git commit: Add the ability to customize scheduling logic.
Date Thu, 25 May 2017 06:18:40 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 1fbf7732d -> 92f6d9f64


Add the ability to customize scheduling logic.

Uses Guice module injection to enable replacing the first-fit scheduling algorithm and associated first-fit preemption logic.

See design/proposal document here: https://docs.google.com/document/d/1fVHLt9AF-YbOCVCDMQmi5DATVusn-tqY8DldKbjVEm0/edit?usp=sharing

Bugs closed: AURORA-1920

Reviewed at https://reviews.apache.org/r/59039/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/92f6d9f6
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/92f6d9f6
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/92f6d9f6

Branch: refs/heads/master
Commit: 92f6d9f6443174a9ae3dc001967c208add2149ed
Parents: 1fbf773
Author: David McLaughlin <david@dmclaughlin.com>
Authored: Wed May 24 22:51:54 2017 -0700
Committer: David McLaughlin <dmclaughlin@twitter.com>
Committed: Wed May 24 22:51:54 2017 -0700

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   3 +
 docs/development/design-documents.md            |   1 +
 docs/reference/scheduler-configuration.md       |   4 +
 .../preemptor/PendingTaskProcessorModule.java   |  30 ++
 .../preemptor/PreemptionVictimFilterModule.java |  30 ++
 .../scheduler/preemptor/PreemptorModule.java    |  62 ++-
 .../state/FirstFitTaskAssignerModule.java       |  31 ++
 .../aurora/scheduler/state/StateModule.java     |  29 +-
 .../aurora/scheduler/state/TaskAssigner.java    |   6 +-
 .../preemptor/PreemptorModuleTest.java          |  10 +-
 .../state/FirstFitTaskAssignerTest.java         | 516 +++++++++++++++++++
 .../scheduler/state/TaskAssignerImplTest.java   | 516 -------------------
 12 files changed, 706 insertions(+), 532 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 77376e4..75b3ddb 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -25,6 +25,9 @@
   instances. See [here](docs/reference/client-commands.md#scping-with-task-machines) for details.
   Currently only fully supported for Mesos containers (you can copy files from the Docker container
   sandbox but you cannot send files to it).
+- Added ability to inject your own scheduling logic, via a lazy Guice module binding. This is an
+  alpha-level feature and not subject to backwards compatibility considerations. You can specify
+  your custom modules using the `task_assigner_modules` and `preemption_slot_finder_modules` options.
 
 0.17.0
 ======

http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/docs/development/design-documents.md
----------------------------------------------------------------------
diff --git a/docs/development/design-documents.md b/docs/development/design-documents.md
index c942643..9c714e4 100644
--- a/docs/development/design-documents.md
+++ b/docs/development/design-documents.md
@@ -18,5 +18,6 @@ Current and past documents:
 * [Supporting the Mesos Universal Containerizer](https://docs.google.com/document/d/111T09NBF2zjjl7HE95xglsDpRdKoZqhCRM5hHmOfTLA/edit?usp=sharing)
 * [Tier Management In Apache Aurora](https://docs.google.com/document/d/1erszT-HsWf1zCIfhbqHlsotHxWUvDyI2xUwNQQQxLgs/edit?usp=sharing)
 * [Ubiquitous Jobs](https://docs.google.com/document/d/12hr6GnUZU3mc7xsWRzMi3nQILGB-3vyUxvbG-6YmvdE/edit)
+* [Pluggable Scheduling](https://docs.google.com/document/d/1fVHLt9AF-YbOCVCDMQmi5DATVusn-tqY8DldKbjVEm0/edit)
 
 Design documents can be found in the Aurora issue tracker via the query [`project = AURORA AND text ~ "docs.google.com" ORDER BY created`](https://issues.apache.org/jira/browse/AURORA-1528?jql=project%20%3D%20AURORA%20AND%20text%20~%20%22docs.google.com%22%20ORDER%20BY%20created).

http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/docs/reference/scheduler-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md
index 3e3d799..3d53c5a 100644
--- a/docs/reference/scheduler-configuration.md
+++ b/docs/reference/scheduler-configuration.md
@@ -190,6 +190,8 @@ Optional flags:
 	If true, Aurora populates DiscoveryInfo field of Mesos TaskInfo.
 -preemption_delay (default (3, mins))
 	Time interval after which a pending task becomes eligible to preempt other tasks
+-preemption_slot_finder_modules (default [class org.apache.aurora.scheduler.preemptor.PendingTaskProcessorModule, class org.apache.aurora.scheduler.preemptor.PreemptionVictimFilterModule])
+  Guice modules for replacing preemption logic.
 -preemption_slot_hold_time (default (5, mins))
 	Time to hold a preemption slot found before it is discarded.
 -preemption_slot_search_interval (default (1, mins))
@@ -234,6 +236,8 @@ Optional flags:
 	Time for a stat to be retained in memory before expiring.
 -stat_sampling_interval (default (1, secs))
 	Statistic value sampling interval.
+-task_assigner_modules (default [class org.apache.aurora.scheduler.state.FirstFitTaskAssignerModule])
+  Guice modules for replacing task assignment logic.
 -thermos_executor_cpu (default 0.25)
 	The number of CPU cores to allocate for each instance of the executor.
 -thermos_executor_flags

http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorModule.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorModule.java
new file mode 100644
index 0000000..b943f74
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorModule.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import javax.inject.Singleton;
+
+import com.google.inject.AbstractModule;
+
+/**
+ * Module to install the default pending task slot-finder implementation.
+ */
+public class PendingTaskProcessorModule extends AbstractModule {
+  @Override
+  protected void configure() {
+    bind(Runnable.class).annotatedWith(PreemptorModule.PreemptionSlotFinder.class)
+        .to(PendingTaskProcessor.class);
+    bind(PendingTaskProcessor.class).in(Singleton.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterModule.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterModule.java
new file mode 100644
index 0000000..582f660
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterModule.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import javax.inject.Singleton;
+
+import com.google.inject.AbstractModule;
+
+/**
+ * Module to install the default preemption victim filter implementation.
+ */
+public class PreemptionVictimFilterModule extends AbstractModule {
+  @Override
+  protected void configure() {
+    bind(PreemptionVictimFilter.class)
+        .to(PreemptionVictimFilter.PreemptionVictimFilterImpl.class);
+    bind(PreemptionVictimFilter.PreemptionVictimFilterImpl.class).in(Singleton.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java
index 92087eb..b3ca1a3 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java
@@ -13,13 +13,19 @@
  */
 package org.apache.aurora.scheduler.preemptor;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.Set;
 import javax.inject.Inject;
+import javax.inject.Qualifier;
 import javax.inject.Singleton;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.AbstractScheduledService;
 import com.google.inject.AbstractModule;
+import com.google.inject.Module;
 import com.google.inject.PrivateModule;
 import com.google.inject.TypeLiteral;
 
@@ -29,12 +35,17 @@ import org.apache.aurora.common.args.constraints.Positive;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.app.MoreModules;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.preemptor.BiCache.BiCacheSettings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
 public class PreemptorModule extends AbstractModule {
@@ -65,22 +76,54 @@ public class PreemptorModule extends AbstractModule {
       help = "The maximum number of reservations for a task group to be made in a batch.")
   private static final Arg<Integer> RESERVATION_MAX_BATCH_SIZE = Arg.create(5);
 
+  @CmdLine(name = "preemption_slot_finder_modules",
+      help = "Guice modules for custom preemption slot searching for pending tasks.")
+  private static final Arg<Set<Module>> SLOT_FINDER_MODULES = Arg.create(
+      ImmutableSet.of(
+          MoreModules.lazilyInstantiated(PendingTaskProcessorModule.class),
+          MoreModules.lazilyInstantiated(PreemptionVictimFilterModule.class)));
+
   private final boolean enablePreemptor;
   private final Amount<Long, Time> preemptionDelay;
   private final Amount<Long, Time> slotSearchInterval;
   private final Integer reservationBatchSize;
+  private final Set<Module> slotFinderModules;
+
+  /*
+   * Binding annotation for the async processor that finds preemption slots.
+   */
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface PreemptionSlotFinder { }
 
   @VisibleForTesting
   public PreemptorModule(
       boolean enablePreemptor,
       Amount<Long, Time> preemptionDelay,
       Amount<Long, Time> slotSearchInterval,
-      Integer reservationBatchSize) {
+      Integer reservationBatchSize,
+      Set<Module> slotFinderModules) {
 
     this.enablePreemptor = enablePreemptor;
     this.preemptionDelay = requireNonNull(preemptionDelay);
     this.slotSearchInterval = requireNonNull(slotSearchInterval);
     this.reservationBatchSize = requireNonNull(reservationBatchSize);
+    this.slotFinderModules = requireNonNull(slotFinderModules);
+  }
+
+  @VisibleForTesting
+  public PreemptorModule(
+      boolean enablePreemptor,
+      Amount<Long, Time> preemptionDelay,
+      Amount<Long, Time> slotSearchInterval,
+      Integer reservationBatchSize) {
+
+    this(
+        enablePreemptor,
+        preemptionDelay,
+        slotSearchInterval,
+        reservationBatchSize,
+        SLOT_FINDER_MODULES.get());
   }
 
   public PreemptorModule() {
@@ -88,7 +131,8 @@ public class PreemptorModule extends AbstractModule {
         ENABLE_PREEMPTOR.get(),
         PREEMPTION_DELAY.get(),
         PREEMPTION_SLOT_SEARCH_INTERVAL.get(),
-        RESERVATION_MAX_BATCH_SIZE.get());
+        RESERVATION_MAX_BATCH_SIZE.get(),
+        SLOT_FINDER_MODULES.get());
   }
 
   @Override
@@ -99,9 +143,6 @@ public class PreemptorModule extends AbstractModule {
         if (enablePreemptor) {
           LOG.info("Preemptor Enabled.");
           bind(PreemptorMetrics.class).in(Singleton.class);
-          bind(PreemptionVictimFilter.class)
-              .to(PreemptionVictimFilter.PreemptionVictimFilterImpl.class);
-          bind(PreemptionVictimFilter.PreemptionVictimFilterImpl.class).in(Singleton.class);
           bind(Preemptor.class).to(Preemptor.PreemptorImpl.class);
           bind(Preemptor.PreemptorImpl.class).in(Singleton.class);
           bind(new TypeLiteral<Amount<Long, Time>>() { })
@@ -114,11 +155,14 @@ public class PreemptorModule extends AbstractModule {
           bind(new TypeLiteral<Integer>() { })
               .annotatedWith(PendingTaskProcessor.ReservationBatchSize.class)
               .toInstance(reservationBatchSize);
-          bind(PendingTaskProcessor.class).in(Singleton.class);
           bind(ClusterState.class).to(ClusterStateImpl.class);
           bind(ClusterStateImpl.class).in(Singleton.class);
           expose(ClusterStateImpl.class);
 
+          for (Module module: slotFinderModules) {
+            install(module);
+          }
+
           bind(PreemptorService.class).in(Singleton.class);
           bind(AbstractScheduledService.Scheduler.class).toInstance(
               AbstractScheduledService.Scheduler.newFixedRateSchedule(
@@ -127,7 +171,7 @@ public class PreemptorModule extends AbstractModule {
                   slotSearchInterval.getUnit().getTimeUnit()));
 
           expose(PreemptorService.class);
-          expose(PendingTaskProcessor.class);
+          expose(Runnable.class).annotatedWith(PreemptionSlotFinder.class);
         } else {
           bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
           LOG.warn("Preemptor Disabled.");
@@ -147,11 +191,11 @@ public class PreemptorModule extends AbstractModule {
   }
 
   static class PreemptorService extends AbstractScheduledService {
-    private final PendingTaskProcessor slotFinder;
+    private final Runnable slotFinder;
     private final Scheduler schedule;
 
     @Inject
-    PreemptorService(PendingTaskProcessor slotFinder, Scheduler schedule) {
+    PreemptorService(@PreemptionSlotFinder Runnable slotFinder, Scheduler schedule) {
       this.slotFinder = requireNonNull(slotFinder);
       this.schedule = requireNonNull(schedule);
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java b/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java
new file mode 100644
index 0000000..dc244ee
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import javax.inject.Singleton;
+
+import com.google.inject.AbstractModule;
+
+import org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner;
+
+/**
+ *  Exposes the default TaskAssigner implementation, which is a first-fit scheduling algorithm.
+ */
+public class FirstFitTaskAssignerModule extends AbstractModule {
+  @Override
+  protected void configure() {
+    bind(TaskAssigner.class).to(FirstFitTaskAssigner.class);
+    bind(FirstFitTaskAssigner.class).in(Singleton.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index 0186484..77a37b8 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -13,28 +13,51 @@
  */
 package org.apache.aurora.scheduler.state;
 
+import java.util.Set;
 import javax.inject.Singleton;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
+import com.google.inject.Module;
 
+import org.apache.aurora.common.args.Arg;
+import org.apache.aurora.common.args.CmdLine;
+import org.apache.aurora.scheduler.app.MoreModules;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl;
 import org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
-import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
 import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Binding module for scheduling logic and higher-level state management.
  */
 public class StateModule extends AbstractModule {
 
+  @CmdLine(name = "task_assigner_modules",
+      help = "Guice modules for customizing task assignment.")
+  private static final Arg<Set<Module>> TASK_ASSIGNER_MODULES = Arg.create(
+      ImmutableSet.of(MoreModules.lazilyInstantiated(FirstFitTaskAssignerModule.class)));
+
+  private final Set<Module> assignerModules;
+
+  public StateModule() {
+    this(TASK_ASSIGNER_MODULES.get());
+  }
+
+  private StateModule(Set<Module> assignerModules) {
+    this.assignerModules = requireNonNull(assignerModules);
+  }
+
   @Override
   protected void configure() {
-    bind(TaskAssigner.class).to(TaskAssignerImpl.class);
-    bind(TaskAssignerImpl.class).in(Singleton.class);
+    for (Module module: assignerModules) {
+      install(module);
+    }
     bind(MesosTaskFactory.class).to(MesosTaskFactoryImpl.class);
 
     bind(StateManager.class).to(StateManagerImpl.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 7c531af..25399e4 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -78,8 +78,8 @@ public interface TaskAssigner {
       Iterable<IAssignedTask> tasks,
       Map<String, TaskGroupKey> preemptionReservations);
 
-  class TaskAssignerImpl implements TaskAssigner {
-    private static final Logger LOG = LoggerFactory.getLogger(TaskAssignerImpl.class);
+  class FirstFitTaskAssigner implements TaskAssigner {
+    private static final Logger LOG = LoggerFactory.getLogger(FirstFitTaskAssigner.class);
 
     @VisibleForTesting
     static final Optional<String> LAUNCH_FAILED_MSG =
@@ -100,7 +100,7 @@ public interface TaskAssigner {
     private final UpdateAgentReserver updateAgentReserver;
 
     @Inject
-    public TaskAssignerImpl(
+    public FirstFitTaskAssigner(
         StateManager stateManager,
         SchedulingFilter filter,
         MesosTaskFactory taskFactory,

http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
index da064e3..3317133 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.preemptor;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -70,7 +71,14 @@ public class PreemptorModuleTest extends EasyMockTest {
         false,
         Amount.of(0L, Time.SECONDS),
         Amount.of(0L, Time.SECONDS),
-        5));
+        5,
+        ImmutableSet.of(new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(Runnable.class).annotatedWith(PreemptorModule.PreemptionSlotFinder.class)
+                .toInstance(createMock(Runnable.class));
+          }
+        })));
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java b/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java
new file mode 100644
index 0000000..25c1137
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java
@@ -0,0 +1,516 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.InstanceKeys;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
+import org.apache.mesos.v1.Protos.AgentID;
+import org.apache.mesos.v1.Protos.FrameworkID;
+import org.apache.mesos.v1.Protos.OfferID;
+import org.apache.mesos.v1.Protos.Resource;
+import org.apache.mesos.v1.Protos.TaskID;
+import org.apache.mesos.v1.Protos.TaskInfo;
+import org.apache.mesos.v1.Protos.Value.Range;
+import org.apache.mesos.v1.Protos.Value.Ranges;
+import org.apache.mesos.v1.Protos.Value.Type;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.ASSIGNER_EVALUATED_OFFERS;
+import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.ASSIGNER_LAUNCH_FAILURES;
+import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.LAUNCH_FAILED_MSG;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import static org.apache.mesos.v1.Protos.Offer;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class FirstFitTaskAssignerTest extends EasyMockTest {
+
+  private static final int PORT = 1000;
+  private static final Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT));
+  private static final String SLAVE_ID = MESOS_OFFER.getAgentId().getValue();
+  private static final HostOffer OFFER =
+      new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()
+          .setHost(MESOS_OFFER.getHostname())
+          .setAttributes(ImmutableSet.of(
+              new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname()))))));
+  private static final IScheduledTask TASK = makeTask("id", JOB);
+  private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
+  private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
+      .setName("taskName")
+      .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
+      .setAgentId(MESOS_OFFER.getAgentId())
+      .build();
+  private static final IInstanceKey INSTANCE_KEY =
+      InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId());
+  private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
+  private static final UnusedResource UNUSED = new UnusedResource(
+      bagFromMesosResources(MESOS_OFFER.getResourcesList()),
+      OFFER.getAttributes());
+  private static final HostOffer OFFER_2 = new HostOffer(
+      Offer.newBuilder()
+          .setId(OfferID.newBuilder().setValue("offerId0"))
+              .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
+              .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
+              .setHostname("hostName0")
+          .addResources(Resource.newBuilder()
+          .setName("ports")
+          .setType(Type.RANGES)
+          .setRanges(
+              Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
+              .build(),
+      IHostAttributes.build(new HostAttributes()));
+
+  private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of();
+
+  private ResourceRequest resourceRequest;
+
+  private MutableStoreProvider storeProvider;
+  private StateManager stateManager;
+  private SchedulingFilter filter;
+  private MesosTaskFactory taskFactory;
+  private OfferManager offerManager;
+  private FirstFitTaskAssigner assigner;
+  private TierManager tierManager;
+  private FakeStatsProvider statsProvider;
+  private UpdateAgentReserver updateAgentReserver;
+
+  @Before
+  public void setUp() throws Exception {
+    storeProvider = createMock(MutableStoreProvider.class);
+    filter = createMock(SchedulingFilter.class);
+    taskFactory = createMock(MesosTaskFactory.class);
+    stateManager = createMock(StateManager.class);
+    offerManager = createMock(OfferManager.class);
+    tierManager = createMock(TierManager.class);
+    updateAgentReserver = createMock(UpdateAgentReserver.class);
+    statsProvider = new FakeStatsProvider();
+    assigner = new FirstFitTaskAssigner(
+        stateManager,
+        filter,
+        taskFactory,
+        offerManager,
+        tierManager,
+        updateAgentReserver,
+        statsProvider);
+    resourceRequest = new ResourceRequest(
+        TASK.getAssignedTask().getTask(),
+        ResourceBag.EMPTY,
+        empty());
+  }
+
+  @Test
+  public void testAssignNoTasks() throws Exception {
+    control.replay();
+
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null));
+  }
+
+  @Test
+  public void testAssignPartialNoVetoes() throws Exception {
+    expectNoUpdateReservations(1);
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
+    expectAssignTask(MESOS_OFFER);
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
+        .andReturn(TASK_INFO);
+
+    control.replay();
+
+    AttributeAggregate aggregate = empty();
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(
+                TASK.getAssignedTask(),
+                makeTask("id2", JOB).getAssignedTask(),
+                makeTask("id3", JOB).getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertNotEquals(empty(), aggregate);
+    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+  }
+
+  @Test
+  public void testAssignVetoesWithStaticBan() throws Exception {
+    expectNoUpdateReservations(1);
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+    offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expect(filter.filter(UNUSED, resourceRequest))
+        .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
+
+    control.replay();
+
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(TASK.getAssignedTask()),
+            NO_RESERVATION));
+    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+  }
+
+  @Test
+  public void testAssignVetoesWithNoStaticBan() throws Exception {
+    expectNoUpdateReservations(1);
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expect(filter.filter(UNUSED, resourceRequest))
+        .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit")));
+
+    control.replay();
+
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(TASK.getAssignedTask()),
+            NO_RESERVATION));
+    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+  }
+
+  @Test
+  public void testAssignmentClearedOnError() throws Exception {
+    expectNoUpdateReservations(1);
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER, OFFER_2));
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
+    expectAssignTask(MESOS_OFFER);
+    expect(stateManager.changeState(
+        storeProvider,
+        Tasks.id(TASK),
+        Optional.of(PENDING),
+        LOST,
+        LAUNCH_FAILED_MSG))
+        .andReturn(StateChangeResult.SUCCESS);
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
+        .andReturn(TASK_INFO);
+
+    control.replay();
+
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+    // Ensures scheduling loop terminates on the first launch failure.
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(
+                TASK.getAssignedTask(),
+                makeTask("id2", JOB).getAssignedTask(),
+                makeTask("id3", JOB).getAssignedTask()),
+            NO_RESERVATION));
+    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
+    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+  }
+
+  @Test
+  public void testAssignmentSkippedForReservedSlave() throws Exception {
+    expectNoUpdateReservations(0);
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+
+    control.replay();
+
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
+                ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
+    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+  }
+
+  @Test
+  public void testTaskWithReservedSlaveLandsElsewhere() throws Exception {
+    // Ensures slave/task reservation relationship is only enforced in slave->task direction
+    // and permissive in task->slave direction. In other words, a task with a slave reservation
+    // should still be tried against other unreserved slaves.
+    expectNoUpdateReservations(1);
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER_2, OFFER));
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expect(filter.filter(
+        new UnusedResource(
+            bagFromMesosResources(OFFER_2.getOffer().getResourcesList()),
+            OFFER_2.getAttributes()),
+        resourceRequest)).andReturn(ImmutableSet.of());
+    expectAssignTask(OFFER_2.getOffer());
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer()))
+        .andReturn(TASK_INFO);
+    offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO);
+
+    control.replay();
+
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+  }
+
+  @Test
+  public void testAssignerDoesNotReturnOnFirstMismatch() throws Exception {
+    // Ensures scheduling loop does not terminate prematurely when the first mismatch is identified.
+    HostOffer mismatched = new HostOffer(
+        Offer.newBuilder()
+            .setId(OfferID.newBuilder().setValue("offerId0"))
+            .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
+            .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
+            .setHostname("hostName0")
+            .addResources(Resource.newBuilder()
+                .setName("ports")
+                .setType(Type.RANGES)
+                .setRanges(
+                    Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
+            .build(),
+        IHostAttributes.build(new HostAttributes()));
+
+    expectNoUpdateReservations(2);
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER));
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expect(filter.filter(
+        new UnusedResource(
+            bagFromMesosResources(mismatched.getOffer().getResourcesList()),
+            mismatched.getAttributes()),
+        resourceRequest))
+        .andReturn(ImmutableSet.of(Veto.constraintMismatch("constraint mismatch")));
+    offerManager.banOffer(mismatched.getOffer().getId(), GROUP_KEY);
+    expect(filter.filter(
+        new UnusedResource(
+            bagFromMesosResources(MESOS_OFFER.getResourcesList()), OFFER.getAttributes()),
+        resourceRequest))
+        .andReturn(ImmutableSet.of());
+
+    expectAssignTask(MESOS_OFFER);
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer()))
+        .andReturn(TASK_INFO);
+    offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO);
+
+    control.replay();
+
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertEquals(2L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+  }
+
+  @Test
+  public void testResourceMapperCallback() {
+    AssignedTask builder = TASK.newBuilder().getAssignedTask();
+    builder.unsetAssignedPorts();
+
+    control.replay();
+
+    assertEquals(
+        TASK.getAssignedTask(),
+        assigner.mapAndAssignResources(MESOS_OFFER, IAssignedTask.build(builder)));
+  }
+
+  @Test
+  public void testAssignToReservedAgent() throws Exception {
+    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
+    expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
+    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
+    expectAssignTask(MESOS_OFFER);
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
+        .andReturn(TASK_INFO);
+
+    control.replay();
+
+    AttributeAggregate aggregate = empty();
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(
+                TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertNotEquals(empty(), aggregate);
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+  }
+
+  @Test
+  public void testAssignReservedAgentWhenOfferNotReady() throws Exception {
+    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+    expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
+    expect(filter.filter(UNUSED, resourceRequest))
+        .andReturn(ImmutableSet.of(Veto.insufficientResources("cpu", 1)));
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
+    expectLastCall();
+
+    control.replay();
+
+    AttributeAggregate aggregate = empty();
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+    assertEquals(
+        ImmutableSet.of(),
+        assigner.maybeAssign(
+            storeProvider,
+            new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertEquals(empty(), aggregate);
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+  }
+
+  @Test
+  public void testAssignWithMixOfReservedAndNotReserved() throws Exception {
+    AttributeAggregate aggregate = empty();
+    ResourceRequest resources = new ResourceRequest(
+        TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate);
+    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
+    expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
+    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
+    expectAssignTask(MESOS_OFFER);
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
+        .andReturn(TASK_INFO);
+
+    // Normal scheduling loop for the remaining task...
+    expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 9999))).andReturn(Optional.absent());
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+    expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue()))
+        .andReturn(ImmutableSet.of());
+    expect(filter.filter(UNUSED, resources))
+        .andReturn(ImmutableSet.of(Veto.constraintMismatch("lol")));
+    offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
+
+    control.replay();
+
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            resources,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(
+                TASK.getAssignedTask(),
+                makeTask("another-task", JOB, 9999).getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertNotEquals(empty(), aggregate);
+    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+  }
+
+  private void expectAssignTask(Offer offer) {
+    expect(stateManager.assignTask(
+        eq(storeProvider),
+        eq(Tasks.id(TASK)),
+        eq(offer.getHostname()),
+        eq(offer.getAgentId()),
+        anyObject())).andReturn(TASK.getAssignedTask());
+  }
+
+  private void expectNoUpdateReservations(int offers) {
+    expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false);
+    for (int i = 0; i < offers; i++) {
+      expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
deleted file mode 100644
index 11835dc..0000000
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ /dev/null
@@ -1,516 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TierManager;
-import org.apache.aurora.scheduler.base.InstanceKeys;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
-import org.apache.mesos.v1.Protos.AgentID;
-import org.apache.mesos.v1.Protos.FrameworkID;
-import org.apache.mesos.v1.Protos.OfferID;
-import org.apache.mesos.v1.Protos.Resource;
-import org.apache.mesos.v1.Protos.TaskID;
-import org.apache.mesos.v1.Protos.TaskInfo;
-import org.apache.mesos.v1.Protos.Value.Range;
-import org.apache.mesos.v1.Protos.Value.Ranges;
-import org.apache.mesos.v1.Protos.Value.Type;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
-import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
-import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
-import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
-import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
-import static org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl.ASSIGNER_EVALUATED_OFFERS;
-import static org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl.ASSIGNER_LAUNCH_FAILURES;
-import static org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl.LAUNCH_FAILED_MSG;
-import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import static org.apache.mesos.v1.Protos.Offer;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-public class TaskAssignerImplTest extends EasyMockTest {
-
-  private static final int PORT = 1000;
-  private static final Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT));
-  private static final String SLAVE_ID = MESOS_OFFER.getAgentId().getValue();
-  private static final HostOffer OFFER =
-      new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()
-          .setHost(MESOS_OFFER.getHostname())
-          .setAttributes(ImmutableSet.of(
-              new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname()))))));
-  private static final IScheduledTask TASK = makeTask("id", JOB);
-  private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
-  private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
-      .setName("taskName")
-      .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
-      .setAgentId(MESOS_OFFER.getAgentId())
-      .build();
-  private static final IInstanceKey INSTANCE_KEY =
-      InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId());
-  private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
-  private static final UnusedResource UNUSED = new UnusedResource(
-      bagFromMesosResources(MESOS_OFFER.getResourcesList()),
-      OFFER.getAttributes());
-  private static final HostOffer OFFER_2 = new HostOffer(
-      Offer.newBuilder()
-          .setId(OfferID.newBuilder().setValue("offerId0"))
-              .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
-              .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
-              .setHostname("hostName0")
-          .addResources(Resource.newBuilder()
-          .setName("ports")
-          .setType(Type.RANGES)
-          .setRanges(
-              Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
-              .build(),
-      IHostAttributes.build(new HostAttributes()));
-
-  private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of();
-
-  private ResourceRequest resourceRequest;
-
-  private MutableStoreProvider storeProvider;
-  private StateManager stateManager;
-  private SchedulingFilter filter;
-  private MesosTaskFactory taskFactory;
-  private OfferManager offerManager;
-  private TaskAssignerImpl assigner;
-  private TierManager tierManager;
-  private FakeStatsProvider statsProvider;
-  private UpdateAgentReserver updateAgentReserver;
-
-  @Before
-  public void setUp() throws Exception {
-    storeProvider = createMock(MutableStoreProvider.class);
-    filter = createMock(SchedulingFilter.class);
-    taskFactory = createMock(MesosTaskFactory.class);
-    stateManager = createMock(StateManager.class);
-    offerManager = createMock(OfferManager.class);
-    tierManager = createMock(TierManager.class);
-    updateAgentReserver = createMock(UpdateAgentReserver.class);
-    statsProvider = new FakeStatsProvider();
-    assigner = new TaskAssignerImpl(
-        stateManager,
-        filter,
-        taskFactory,
-        offerManager,
-        tierManager,
-        updateAgentReserver,
-        statsProvider);
-    resourceRequest = new ResourceRequest(
-        TASK.getAssignedTask().getTask(),
-        ResourceBag.EMPTY,
-        empty());
-  }
-
-  @Test
-  public void testAssignNoTasks() throws Exception {
-    control.replay();
-
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null));
-  }
-
-  @Test
-  public void testAssignPartialNoVetoes() throws Exception {
-    expectNoUpdateReservations(1);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(MESOS_OFFER);
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
-        .andReturn(TASK_INFO);
-
-    control.replay();
-
-    AttributeAggregate aggregate = empty();
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(
-                TASK.getAssignedTask(),
-                makeTask("id2", JOB).getAssignedTask(),
-                makeTask("id3", JOB).getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertNotEquals(empty(), aggregate);
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignVetoesWithStaticBan() throws Exception {
-    expectNoUpdateReservations(1);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-    offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, resourceRequest))
-        .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            NO_RESERVATION));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignVetoesWithNoStaticBan() throws Exception {
-    expectNoUpdateReservations(1);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, resourceRequest))
-        .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit")));
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            NO_RESERVATION));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignmentClearedOnError() throws Exception {
-    expectNoUpdateReservations(1);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER, OFFER_2));
-    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(MESOS_OFFER);
-    expect(stateManager.changeState(
-        storeProvider,
-        Tasks.id(TASK),
-        Optional.of(PENDING),
-        LOST,
-        LAUNCH_FAILED_MSG))
-        .andReturn(StateChangeResult.SUCCESS);
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
-        .andReturn(TASK_INFO);
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    // Ensures scheduling loop terminates on the first launch failure.
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(
-                TASK.getAssignedTask(),
-                makeTask("id2", JOB).getAssignedTask(),
-                makeTask("id3", JOB).getAssignedTask()),
-            NO_RESERVATION));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignmentSkippedForReservedSlave() throws Exception {
-    expectNoUpdateReservations(0);
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
-                ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testTaskWithReservedSlaveLandsElsewhere() throws Exception {
-    // Ensures slave/task reservation relationship is only enforced in slave->task direction
-    // and permissive in task->slave direction. In other words, a task with a slave reservation
-    // should still be tried against other unreserved slaves.
-    expectNoUpdateReservations(1);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER_2, OFFER));
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(
-        new UnusedResource(
-            bagFromMesosResources(OFFER_2.getOffer().getResourcesList()),
-            OFFER_2.getAttributes()),
-        resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(OFFER_2.getOffer());
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer()))
-        .andReturn(TASK_INFO);
-    offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO);
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignerDoesNotReturnOnFirstMismatch() throws Exception {
-    // Ensures scheduling loop does not terminate prematurely when the first mismatch is identified.
-    HostOffer mismatched = new HostOffer(
-        Offer.newBuilder()
-            .setId(OfferID.newBuilder().setValue("offerId0"))
-            .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
-            .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
-            .setHostname("hostName0")
-            .addResources(Resource.newBuilder()
-                .setName("ports")
-                .setType(Type.RANGES)
-                .setRanges(
-                    Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
-            .build(),
-        IHostAttributes.build(new HostAttributes()));
-
-    expectNoUpdateReservations(2);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER));
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(
-        new UnusedResource(
-            bagFromMesosResources(mismatched.getOffer().getResourcesList()),
-            mismatched.getAttributes()),
-        resourceRequest))
-        .andReturn(ImmutableSet.of(Veto.constraintMismatch("constraint mismatch")));
-    offerManager.banOffer(mismatched.getOffer().getId(), GROUP_KEY);
-    expect(filter.filter(
-        new UnusedResource(
-            bagFromMesosResources(MESOS_OFFER.getResourcesList()), OFFER.getAttributes()),
-        resourceRequest))
-        .andReturn(ImmutableSet.of());
-
-    expectAssignTask(MESOS_OFFER);
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer()))
-        .andReturn(TASK_INFO);
-    offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO);
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertEquals(2L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testResourceMapperCallback() {
-    AssignedTask builder = TASK.newBuilder().getAssignedTask();
-    builder.unsetAssignedPorts();
-
-    control.replay();
-
-    assertEquals(
-        TASK.getAssignedTask(),
-        assigner.mapAndAssignResources(MESOS_OFFER, IAssignedTask.build(builder)));
-  }
-
-  @Test
-  public void testAssignToReservedAgent() throws Exception {
-    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
-    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
-    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
-    expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
-    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(MESOS_OFFER);
-    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
-        .andReturn(TASK_INFO);
-
-    control.replay();
-
-    AttributeAggregate aggregate = empty();
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(
-                TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertNotEquals(empty(), aggregate);
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignReservedAgentWhenOfferNotReady() throws Exception {
-    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
-    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
-    expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
-    expect(filter.filter(UNUSED, resourceRequest))
-        .andReturn(ImmutableSet.of(Veto.insufficientResources("cpu", 1)));
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
-    expectLastCall();
-
-    control.replay();
-
-    AttributeAggregate aggregate = empty();
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(),
-        assigner.maybeAssign(
-            storeProvider,
-            new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertEquals(empty(), aggregate);
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignWithMixOfReservedAndNotReserved() throws Exception {
-    AttributeAggregate aggregate = empty();
-    ResourceRequest resources = new ResourceRequest(
-        TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate);
-    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
-    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
-    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
-    expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
-    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(MESOS_OFFER);
-    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
-        .andReturn(TASK_INFO);
-
-    // Normal scheduling loop for the remaining task...
-    expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 9999))).andReturn(Optional.absent());
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-    expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue()))
-        .andReturn(ImmutableSet.of());
-    expect(filter.filter(UNUSED, resources))
-        .andReturn(ImmutableSet.of(Veto.constraintMismatch("lol")));
-    offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            resources,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(
-                TASK.getAssignedTask(),
-                makeTask("another-task", JOB, 9999).getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertNotEquals(empty(), aggregate);
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  private void expectAssignTask(Offer offer) {
-    expect(stateManager.assignTask(
-        eq(storeProvider),
-        eq(Tasks.id(TASK)),
-        eq(offer.getHostname()),
-        eq(offer.getAgentId()),
-        anyObject())).andReturn(TASK.getAssignedTask());
-  }
-
-  private void expectNoUpdateReservations(int offers) {
-    expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false);
-    for (int i = 0; i < offers; i++) {
-      expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of());
-    }
-  }
-}


Mime
View raw message