aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [1/8] aurora git commit: Break apart async package and AsyncModule into purpose-specific equivalents.
Date Wed, 22 Jul 2015 19:39:58 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 6e2bf577f -> 0070a5fd1


http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
new file mode 100644
index 0000000..97d25f9
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.reconciliation;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.state.StateChangeResult;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+
+public class TaskTimeoutTest extends EasyMockTest {
+
+  private static final String TASK_ID = "task_id";
+  private static final Amount<Long, Time> TIMEOUT = Amount.of(1L, Time.MINUTES);
+
+  private AtomicLong timedOutTaskCounter;
+  private ScheduledExecutorService executor;
+  private StorageTestUtil storageUtil;
+  private ScheduledFuture<?> future;
+  private StateManager stateManager;
+  private FakeClock clock;
+  private TaskTimeout timeout;
+  private StatsProvider statsProvider;
+
+  @Before
+  public void setUp() {
+    executor = createMock(ScheduledExecutorService.class);
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    future = createMock(new Clazz<ScheduledFuture<?>>() { });
+    stateManager = createMock(StateManager.class);
+    clock = new FakeClock();
+    statsProvider = createMock(StatsProvider.class);
+    timedOutTaskCounter = new AtomicLong();
+    expect(statsProvider.makeCounter(TaskTimeout.TIMED_OUT_TASKS_COUNTER))
+        .andReturn(timedOutTaskCounter);
+  }
+
+  private void replayAndCreate() {
+    control.replay();
+    timeout = new TaskTimeout(
+        executor,
+        storageUtil.storage,
+        stateManager,
+        TIMEOUT,
+        statsProvider);
+    timeout.startAsync().awaitRunning();
+  }
+
+  private Capture<Runnable> expectTaskWatch(Amount<Long, Time> expireIn) {
+    Capture<Runnable> capture = createCapture();
+    executor.schedule(
+        EasyMock.capture(capture),
+        eq((long) expireIn.getValue()),
+        eq(expireIn.getUnit().getTimeUnit()));
+    expectLastCall().andReturn(future);
+    return capture;
+  }
+
+  private Capture<Runnable> expectTaskWatch() {
+    return expectTaskWatch(TIMEOUT);
+  }
+
+  private void changeState(String taskId, ScheduleStatus from, ScheduleStatus to) {
+    IScheduledTask task = IScheduledTask.build(new ScheduledTask()
+        .setStatus(to)
+        .setAssignedTask(new AssignedTask().setTaskId(taskId)));
+    timeout.recordStateChange(TaskStateChange.transition(task, from));
+  }
+
+  private void changeState(ScheduleStatus from, ScheduleStatus to) {
+    changeState(TASK_ID, from, to);
+  }
+
+  @Test
+  public void testNormalTransitions() {
+    expectTaskWatch();
+    expectTaskWatch();
+
+    replayAndCreate();
+
+    changeState(INIT, PENDING);
+    changeState(PENDING, ASSIGNED);
+    changeState(ASSIGNED, STARTING);
+    changeState(STARTING, RUNNING);
+    changeState(RUNNING, KILLING);
+    changeState(KILLING, KILLED);
+  }
+
+  @Test
+  public void testTransientToTransient() {
+    expectTaskWatch();
+    Capture<Runnable> killingTimeout = expectTaskWatch();
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        TASK_ID,
+        Optional.of(KILLING),
+        LOST,
+        TaskTimeout.TIMEOUT_MESSAGE))
+        .andReturn(StateChangeResult.SUCCESS);
+
+    replayAndCreate();
+
+    changeState(PENDING, ASSIGNED);
+    changeState(ASSIGNED, KILLING);
+    killingTimeout.getValue().run();
+  }
+
+  @Test
+  public void testTimeout() throws Exception {
+    Capture<Runnable> assignedTimeout = expectTaskWatch();
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        TASK_ID,
+        Optional.of(ASSIGNED),
+        LOST,
+        TaskTimeout.TIMEOUT_MESSAGE))
+        .andReturn(StateChangeResult.SUCCESS);
+
+    replayAndCreate();
+
+    changeState(INIT, PENDING);
+    changeState(PENDING, ASSIGNED);
+    assignedTimeout.getValue().run();
+    assertEquals(timedOutTaskCounter.intValue(), 1);
+  }
+
+  @Test
+  public void testTaskDeleted() throws Exception {
+    Capture<Runnable> assignedTimeout = expectTaskWatch();
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        TASK_ID,
+        Optional.of(KILLING),
+        LOST,
+        TaskTimeout.TIMEOUT_MESSAGE))
+        .andReturn(StateChangeResult.ILLEGAL);
+
+    replayAndCreate();
+
+    changeState(INIT, PENDING);
+    changeState(PENDING, KILLING);
+    assignedTimeout.getValue().run();
+    assertEquals(timedOutTaskCounter.intValue(), 0);
+  }
+
+  private static IScheduledTask makeTask(
+      String taskId,
+      ScheduleStatus status,
+      long stateEnteredMs) {
+
+    return IScheduledTask.build(new ScheduledTask()
+        .setStatus(status)
+        .setTaskEvents(ImmutableList.of(new TaskEvent(stateEnteredMs, status)))
+        .setAssignedTask(new AssignedTask()
+            .setTaskId(taskId)
+            .setTask(new TaskConfig())));
+  }
+
+  @Test
+  public void testStorageStart() {
+    expectTaskWatch(TIMEOUT);
+    expectTaskWatch(TIMEOUT);
+    expectTaskWatch(TIMEOUT);
+
+    replayAndCreate();
+
+    clock.setNowMillis(TIMEOUT.as(Time.MILLISECONDS) * 2);
+    for (IScheduledTask task : ImmutableList.of(
+        makeTask("a", ASSIGNED, 0),
+        makeTask("b", KILLING, TIMEOUT.as(Time.MILLISECONDS)),
+        makeTask("c", PREEMPTING, clock.nowMillis() + TIMEOUT.as(Time.MILLISECONDS)))) {
+
+      timeout.recordStateChange(TaskStateChange.initialized(task));
+    }
+
+    changeState("a", ASSIGNED, RUNNING);
+    changeState("b", KILLING, KILLED);
+    changeState("c", PREEMPTING, FINISHED);
+  }
+
+  @Test
+  public void testTimeoutWhileNotStarted() throws Exception {
+    // Since the timeout is never instructed to start, it should not attempt to transition tasks,
+    // but it should try again later.
+    Capture<Runnable> assignedTimeout = expectTaskWatch();
+    expectTaskWatch(TaskTimeout.NOT_STARTED_RETRY);
+
+    control.replay();
+    timeout = new TaskTimeout(executor, storageUtil.storage, stateManager, TIMEOUT, statsProvider);
+
+    changeState(INIT, PENDING);
+    changeState(PENDING, ASSIGNED);
+    assignedTimeout.getValue().run();
+    assertEquals(timedOutTaskCounter.intValue(), 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java
new file mode 100644
index 0000000..7936b29
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculatorImplTest.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.BackoffStrategy;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.scheduling.RescheduleCalculator.RescheduleCalculatorImpl;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class RescheduleCalculatorImplTest extends EasyMockTest {
+
+  private static final Amount<Long, Time> FLAPPING_THRESHOLD = Amount.of(1L, Time.MINUTES);
+  private static final Amount<Integer, Time> MAX_STARTUP_DELAY = Amount.of(10, Time.MINUTES);
+
+  private StorageTestUtil storageUtil;
+  private BackoffStrategy backoff;
+  private RescheduleCalculator rescheduleCalculator;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    backoff = createMock(BackoffStrategy.class);
+    rescheduleCalculator = new RescheduleCalculatorImpl(
+        storageUtil.storage,
+        new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
+            backoff,
+            FLAPPING_THRESHOLD,
+            MAX_STARTUP_DELAY));
+    storageUtil.expectOperations();
+  }
+
+  @Test
+  public void testNoPenaltyForNoAncestor() {
+    control.replay();
+
+    assertEquals(0L, rescheduleCalculator.getFlappingPenaltyMs(makeTask("a", INIT)));
+  }
+
+  @Test
+  public void testNoPenaltyDeletedAncestor() {
+    String ancestorId = "a";
+    storageUtil.expectTaskFetch(Query.taskScoped(ancestorId));
+
+    control.replay();
+
+    assertEquals(
+        0L,
+        rescheduleCalculator.getFlappingPenaltyMs(setAncestor(makeTask("b", INIT), ancestorId)));
+  }
+
+  @Test
+  public void testFlappingTask() {
+    IScheduledTask ancestor = makeFlappyTask("a");
+    storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor);
+    long penaltyMs = 1000L;
+    expect(backoff.calculateBackoffMs(0L)).andReturn(penaltyMs);
+
+    control.replay();
+
+    assertEquals(
+        penaltyMs,
+        rescheduleCalculator.getFlappingPenaltyMs(
+            setAncestor(makeTask("b", INIT), Tasks.id(ancestor))));
+  }
+
+  @Test
+  public void testFlappingTasksBackoffTruncation() {
+    // Ensures that the reschedule calculator detects penalty truncation and avoids inspecting
+    // ancestors once truncated.
+    IScheduledTask taskA = setAncestor(makeFlappyTask("a"), "bugIfQueried");
+    IScheduledTask taskB = setAncestor(makeFlappyTask("b"), Tasks.id(taskA));
+    IScheduledTask taskC = setAncestor(makeFlappyTask("c"), Tasks.id(taskB));
+    IScheduledTask taskD = setAncestor(makeFlappyTask("d"), Tasks.id(taskC));
+
+    Map<IScheduledTask, Long> ancestorsAndPenalties = ImmutableMap.of(
+        taskD, 100L,
+        taskC, 200L,
+        taskB, 300L,
+        taskA, 300L);
+
+    long lastPenalty = 0L;
+    for (Map.Entry<IScheduledTask, Long> taskAndPenalty : ancestorsAndPenalties.entrySet()) {
+      storageUtil.expectTaskFetch(
+          Query.taskScoped(Tasks.id(taskAndPenalty.getKey())),
+          taskAndPenalty.getKey());
+      expect(backoff.calculateBackoffMs(lastPenalty)).andReturn(taskAndPenalty.getValue());
+      lastPenalty = taskAndPenalty.getValue();
+    }
+
+    control.replay();
+
+    IScheduledTask newTask = setAncestor(makeFlappyTask("newTask"), Tasks.id(taskD));
+    assertEquals(300L, rescheduleCalculator.getFlappingPenaltyMs(newTask));
+  }
+
+  @Test
+  public void testNoPenaltyForInterruptedTasks() {
+    IScheduledTask ancestor = setEvents(
+        makeTask("a", KILLED),
+        ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, KILLING, 300L, KILLED, 400L));
+    storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor);
+
+    control.replay();
+
+    assertEquals(
+        0L,
+        rescheduleCalculator.getFlappingPenaltyMs(
+            setAncestor(makeTask("b", INIT), Tasks.id(ancestor))));
+  }
+
+  private IScheduledTask makeFlappyTask(String taskId) {
+    return setEvents(
+        makeTask(taskId, FINISHED),
+        ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, FINISHED, 300L));
+  }
+
+  private IScheduledTask makeTask(String taskId) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setAssignedTask(new AssignedTask()
+            .setInstanceId(0)
+            .setTaskId(taskId)
+            .setTask(new TaskConfig()
+                .setJobName("job-" + taskId)
+                .setOwner(new Identity().setRole("role-" + taskId).setUser("user-" + taskId))
+                .setEnvironment("env-" + taskId))));
+  }
+
+  private IScheduledTask makeTask(String taskId, ScheduleStatus status) {
+    return IScheduledTask.build(makeTask(taskId).newBuilder().setStatus(status));
+  }
+
+  private IScheduledTask setAncestor(IScheduledTask task, String ancestorId) {
+    return IScheduledTask.build(task.newBuilder().setAncestorId(ancestorId));
+  }
+
+  private static final Function<Map.Entry<ScheduleStatus, Long>, TaskEvent> TO_EVENT =
+      new Function<Entry<ScheduleStatus, Long>, TaskEvent>() {
+        @Override
+        public TaskEvent apply(Entry<ScheduleStatus, Long> input) {
+          return new TaskEvent().setStatus(input.getKey()).setTimestamp(input.getValue());
+        }
+      };
+
+  private IScheduledTask setEvents(IScheduledTask task, Map<ScheduleStatus, Long> events) {
+    return IScheduledTask.build(task.newBuilder().setTaskEvents(
+        FluentIterable.from(events.entrySet()).transform(TO_EVENT).toList()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
new file mode 100644
index 0000000..10de372
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.RateLimiter;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.BackoffStrategy;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+
+public class TaskGroupsTest extends EasyMockTest {
+
+  private static final long FIRST_SCHEDULE_DELAY_MS = 1L;
+
+  private ScheduledExecutorService executor;
+  private BackoffStrategy backoffStrategy;
+  private TaskScheduler taskScheduler;
+  private RateLimiter rateLimiter;
+
+  private TaskGroups taskGroups;
+
+  @Before
+  public void setUp() throws Exception {
+    executor = createMock(ScheduledExecutorService.class);
+    backoffStrategy = createMock(BackoffStrategy.class);
+    taskScheduler = createMock(TaskScheduler.class);
+    rateLimiter = createMock(RateLimiter.class);
+    taskGroups = new TaskGroups(
+        executor,
+        Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS),
+        backoffStrategy,
+        rateLimiter,
+        taskScheduler,
+        createMock(RescheduleCalculator.class));
+  }
+
+  @Test
+  public void testEvaluatedAfterFirstSchedulePenalty() {
+    executor.schedule(
+        EasyMock.<Runnable>anyObject(),
+        EasyMock.eq(FIRST_SCHEDULE_DELAY_MS),
+        EasyMock.eq(MILLISECONDS));
+    expectLastCall().andAnswer(new IAnswer<ScheduledFuture<Void>>() {
+      @Override
+      public ScheduledFuture<Void> answer() {
+        ((Runnable) EasyMock.getCurrentArguments()[0]).run();
+        return null;
+      }
+    });
+    expect(rateLimiter.acquire()).andReturn(0D);
+    expect(taskScheduler.schedule("a")).andReturn(true);
+
+    control.replay();
+
+    taskGroups.taskChangedState(TaskStateChange.transition(makeTask("a"), INIT));
+  }
+
+  private Capture<Runnable> expectEvaluate() {
+    Capture<Runnable> capture = createCapture();
+    executor.schedule(
+        EasyMock.capture(capture),
+        EasyMock.eq(FIRST_SCHEDULE_DELAY_MS),
+        EasyMock.eq(MILLISECONDS));
+    expectLastCall().andReturn(null);
+    return capture;
+  }
+
+  @Test
+  public void testTaskDeletedBeforeEvaluating() {
+    final IScheduledTask task = makeTask("a");
+
+    Capture<Runnable> evaluate = expectEvaluate();
+
+    expect(rateLimiter.acquire()).andReturn(0D);
+    expect(taskScheduler.schedule(Tasks.id(task))).andAnswer(new IAnswer<Boolean>() {
+      @Override
+      public Boolean answer() {
+        // Test a corner case where a task is deleted while it is being evaluated by the task
+        // scheduler.  If not handled carefully, this could result in the scheduler trying again
+        // later to satisfy the deleted task.
+        taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task)));
+
+        return false;
+      }
+    });
+    expect(backoffStrategy.calculateBackoffMs(FIRST_SCHEDULE_DELAY_MS)).andReturn(0L);
+
+    control.replay();
+
+    taskGroups.taskChangedState(TaskStateChange.transition(makeTask(Tasks.id(task)), INIT));
+    evaluate.getValue().run();
+  }
+
+  private static IScheduledTask makeTask(String id) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setStatus(ScheduleStatus.PENDING)
+        .setAssignedTask(new AssignedTask()
+            .setTaskId(id)
+            .setTask(new TaskConfig()
+                .setOwner(new Identity("owner", "owner"))
+                .setEnvironment("test")
+                .setJobName("job"))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
new file mode 100644
index 0000000..25d8da6
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.TypeLiteral;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.offers.Offers;
+import org.apache.aurora.scheduler.preemptor.BiCache;
+import org.apache.aurora.scheduler.preemptor.Preemptor;
+import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl;
+import org.apache.aurora.scheduler.state.PubsubTestUtil;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.state.TaskAssigner;
+import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
+import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.Protos.TaskInfo;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IExpectationSetters;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TaskSchedulerImplTest extends EasyMockTest {
+
+  private static final IScheduledTask TASK_A =
+      TaskTestUtil.makeTask("a", JobKeys.from("a", "a", "a"));
+  private static final IScheduledTask TASK_B =
+      TaskTestUtil.makeTask("b", JobKeys.from("b", "b", "b"));
+  private static final HostOffer OFFER = new HostOffer(
+      Offers.makeOffer("OFFER_A", "HOST_A"),
+      IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
+
+  private static final String SLAVE_ID = OFFER.getOffer().getSlaveId().getValue();
+
+  private static final TaskGroupKey GROUP_A = TaskGroupKey.from(TASK_A.getAssignedTask().getTask());
+  private static final TaskGroupKey GROUP_B = TaskGroupKey.from(TASK_B.getAssignedTask().getTask());
+
+  private StorageTestUtil storageUtil;
+  private StateManager stateManager;
+  private TaskAssigner assigner;
+  private OfferManager offerManager;
+  private TaskScheduler scheduler;
+  private Preemptor preemptor;
+  private BiCache<String, TaskGroupKey> reservations;
+  private EventSink eventSink;
+
+  @Before
+  public void setUp() throws Exception {
+    storageUtil = new StorageTestUtil(this);
+    stateManager = createMock(StateManager.class);
+    assigner = createMock(TaskAssigner.class);
+    offerManager = createMock(OfferManager.class);
+    preemptor = createMock(Preemptor.class);
+    reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { });
+
+    Injector injector = getInjector(storageUtil.storage);
+    scheduler = injector.getInstance(TaskScheduler.class);
+    eventSink = PubsubTestUtil.startPubsub(injector);
+  }
+
+  private Injector getInjector(final Storage storageImpl) {
+    return Guice.createInjector(
+        new PubsubEventModule(false),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).toInstance(reservations);
+            bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
+            bind(Preemptor.class).toInstance(preemptor);
+            bind(OfferManager.class).toInstance(offerManager);
+            bind(StateManager.class).toInstance(stateManager);
+            bind(TaskAssigner.class).toInstance(assigner);
+            bind(Clock.class).toInstance(createMock(Clock.class));
+            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+            bind(Storage.class).toInstance(storageImpl);
+            PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class);
+          }
+        });
+  }
+
+  private void expectTaskStillPendingQuery(IScheduledTask task) {
+    storageUtil.expectTaskFetch(
+        Query.taskScoped(Tasks.id(task)).byStatus(PENDING),
+        ImmutableSet.of(task));
+  }
+
+  private void expectAssigned(IScheduledTask task) {
+    expect(assigner.maybeAssign(
+        storageUtil.mutableStoreProvider,
+        OFFER,
+        new ResourceRequest(task.getAssignedTask().getTask(), EMPTY),
+        Tasks.id(task))).andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
+  }
+
+  @Test
+  public void testReservation() throws Exception {
+    storageUtil.expectOperations();
+
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
+    expectLaunchAttempt(false);
+    // Reserve "a" with offerA
+    expectReservationCheck(TASK_A);
+    expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
+    expectAddReservation(SLAVE_ID, TASK_A);
+
+    // Use previously created reservation.
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
+    expectGetReservation(SLAVE_ID, TASK_A);
+    expectAssigned(TASK_A);
+    AssignmentCapture assignment = expectLaunchAttempt(true);
+
+    control.replay();
+
+    assertFalse(scheduler.schedule("a"));
+    assertTrue(scheduler.schedule("a"));
+    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
+  }
+
+  @Test
+  public void testReservationExpires() throws Exception {
+    storageUtil.expectOperations();
+
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
+    expectLaunchAttempt(false);
+    // Reserve "a" with offerA
+    expectReservationCheck(TASK_A);
+    expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
+    expectAddReservation(SLAVE_ID, TASK_A);
+
+    // First attempt -> reservation is active.
+    expectTaskStillPendingQuery(TASK_B);
+    expectActiveJobFetch(TASK_B);
+    AssignmentCapture firstAssignment = expectLaunchAttempt(false);
+    expectGetReservation(SLAVE_ID, TASK_A);
+    expectReservationCheck(TASK_B);
+    expectPreemptorCall(TASK_B, Optional.absent());
+
+    // Status changed -> reservation removed.
+    reservations.remove(SLAVE_ID, TaskGroupKey.from(TASK_A.getAssignedTask().getTask()));
+
+    // Second attempt -> reservation expires.
+    expectGetNoReservation(SLAVE_ID);
+    expectTaskStillPendingQuery(TASK_B);
+    expectActiveJobFetch(TASK_B);
+    AssignmentCapture secondAssignment = expectLaunchAttempt(true);
+    expectAssigned(TASK_B);
+
+    control.replay();
+
+    assertFalse(scheduler.schedule("a"));
+    assertFalse(scheduler.schedule("b"));
+    assignAndAssert(Result.FAILURE, GROUP_B, OFFER, firstAssignment);
+
+    eventSink.post(TaskStateChange.transition(assign(TASK_A, SLAVE_ID), PENDING));
+    assertTrue(scheduler.schedule("b"));
+    assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment);
+  }
+
+  @Test
+  public void testReservationUnusable() throws Exception {
+    storageUtil.expectOperations();
+
+    expectTaskStillPendingQuery(TASK_A);
+    expectLaunchAttempt(false);
+    expect(reservations.getByValue(TaskGroupKey.from(TASK_A.getAssignedTask().getTask())))
+        .andReturn(ImmutableSet.of(SLAVE_ID));
+
+    control.replay();
+
+    assertFalse(scheduler.schedule("a"));
+  }
+
+  @Test
+  public void testNonPendingIgnored() throws Exception {
+    control.replay();
+
+    eventSink.post(TaskStateChange.transition(TASK_A, RUNNING));
+  }
+
+  @Test
+  public void testPendingDeletedHandled() throws Exception {
+    control.replay();
+
+    IScheduledTask task = IScheduledTask.build(TASK_A.newBuilder().setStatus(PENDING));
+    eventSink.post(TaskStateChange.transition(task, PENDING));
+  }
+
+  @Test
+  public void testIgnoresThrottledTasks() throws Exception {
+    // Ensures that tasks in THROTTLED state are not considered part of the active job state passed
+    // to the assigner function.
+
+    Storage memStorage = DbUtil.createStorage();
+
+    Injector injector = getInjector(memStorage);
+    scheduler = injector.getInstance(TaskScheduler.class);
+    eventSink = PubsubTestUtil.startPubsub(injector);
+
+    ScheduledTask builder = TASK_A.newBuilder();
+    final IScheduledTask taskA = IScheduledTask.build(builder.setStatus(PENDING));
+    builder.getAssignedTask().setTaskId("b");
+    final IScheduledTask taskB = IScheduledTask.build(builder.setStatus(THROTTLED));
+
+    memStorage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider store) {
+        store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(taskA, taskB));
+      }
+    });
+
+    expectGetNoReservation(SLAVE_ID);
+    AssignmentCapture assignment = expectLaunchAttempt(true);
+    expect(assigner.maybeAssign(
+        EasyMock.anyObject(),
+        eq(OFFER),
+        eq(new ResourceRequest(taskA.getAssignedTask().getTask(), EMPTY)),
+        eq(Tasks.id(taskA)))).andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
+
+    control.replay();
+
+    assertTrue(scheduler.schedule(Tasks.id(taskA)));
+    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
+  }
+
+  private static class AssignmentCapture {
+    public Capture<Function<HostOffer, Assignment>> assigner = createCapture();
+    public Capture<TaskGroupKey> groupKey = createCapture();
+  }
+
+  private void expectPreemptorCall(IScheduledTask task, Optional<String> result) {
+    expect(preemptor.attemptPreemptionFor(
+        task.getAssignedTask(),
+        EMPTY,
+        storageUtil.mutableStoreProvider)).andReturn(result);
+  }
+
+  private AssignmentCapture expectLaunchAttempt(boolean taskLaunched)
+      throws OfferManager.LaunchException {
+
+    AssignmentCapture capture = new AssignmentCapture();
+    expect(offerManager.launchFirst(capture(capture.assigner), capture(capture.groupKey)))
+        .andReturn(taskLaunched);
+    return capture;
+  }
+
+  private IScheduledTask assign(IScheduledTask task, String slaveId) {
+    ScheduledTask result = task.newBuilder();
+    result.getAssignedTask().setSlaveId(slaveId);
+    return IScheduledTask.build(result);
+  }
+
+  private void assignAndAssert(
+      Result result,
+      TaskGroupKey groupKey,
+      HostOffer offer,
+      AssignmentCapture capture) {
+
+    assertEquals(result, capture.assigner.getValue().apply(offer).getResult());
+    assertEquals(groupKey, capture.groupKey.getValue());
+  }
+
+  private void expectActiveJobFetch(IScheduledTask task) {
+    storageUtil.expectTaskFetch(
+        Query.jobScoped(Tasks.SCHEDULED_TO_JOB_KEY.apply(task))
+            .byStatus(Tasks.SLAVE_ASSIGNED_STATES),
+        ImmutableSet.of());
+  }
+
+  private void expectAddReservation(String slaveId, IScheduledTask task) {
+    reservations.put(slaveId, TaskGroupKey.from(task.getAssignedTask().getTask()));
+  }
+
+  private IExpectationSetters<?> expectGetReservation(String slaveId, IScheduledTask task) {
+    return expect(reservations.get(slaveId))
+        .andReturn(Optional.of(TaskGroupKey.from(task.getAssignedTask().getTask())));
+  }
+
+  private IExpectationSetters<?> expectGetNoReservation(String slaveId) {
+    return expect(reservations.get(slaveId)).andReturn(Optional.absent());
+  }
+
+  private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) {
+    return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
+        .andReturn(ImmutableSet.of());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java
new file mode 100644
index 0000000..52af364
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java
@@ -0,0 +1,671 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.RateLimiter;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.BackoffStrategy;
+
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl;
+import org.apache.aurora.scheduler.offers.OfferManager.OfferReturnDelay;
+import org.apache.aurora.scheduler.offers.Offers;
+import org.apache.aurora.scheduler.preemptor.BiCache;
+import org.apache.aurora.scheduler.preemptor.Preemptor;
+import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl;
+import org.apache.aurora.scheduler.state.MaintenanceController;
+import org.apache.aurora.scheduler.state.StateChangeResult;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.state.TaskAssigner;
+import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IExpectationSetters;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
+import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.mesos.Protos.Offer;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.isA;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * TODO(wfarner): Break this test up to independently test TaskSchedulerImpl and OfferQueueImpl.
+ */
+public class TaskSchedulerTest extends EasyMockTest {
+
+  private static final long FIRST_SCHEDULE_DELAY_MS = 1L;
+
+  private static final HostOffer OFFER_A =  makeOffer("OFFER_A", "HOST_A", NONE);
+  private static final HostOffer OFFER_B = makeOffer("OFFER_B", "HOST_B", SCHEDULED);
+  private static final HostOffer OFFER_C = makeOffer("OFFER_C", "HOST_C", DRAINING);
+  private static final HostOffer OFFER_D = makeOffer("OFFER_D", "HOST_D", DRAINED);
+  private static final String SLAVE_A = OFFER_A.getOffer().getSlaveId().getValue();
+  private static final String SLAVE_B = OFFER_B.getOffer().getSlaveId().getValue();
+  private static final String SLAVE_C = OFFER_C.getOffer().getSlaveId().getValue();
+
+  private Storage storage;
+
+  private MaintenanceController maintenance;
+  private StateManager stateManager;
+  private TaskAssigner assigner;
+  private BackoffStrategy retryStrategy;
+  private Driver driver;
+  private ScheduledExecutorService executor;
+  private ScheduledFuture<?> future;
+  private OfferReturnDelay returnDelay;
+  private OfferManager offerManager;
+  private TaskGroups taskGroups;
+  private RescheduleCalculator rescheduleCalculator;
+  private Preemptor preemptor;
+  private BiCache<String, TaskGroupKey> reservations;
+
+  @Before
+  public void setUp() {
+    storage = DbUtil.createStorage();
+    maintenance = createMock(MaintenanceController.class);
+    stateManager = createMock(StateManager.class);
+    assigner = createMock(TaskAssigner.class);
+    retryStrategy = createMock(BackoffStrategy.class);
+    driver = createMock(Driver.class);
+    executor = createMock(ScheduledExecutorService.class);
+    future = createMock(ScheduledFuture.class);
+    returnDelay = createMock(OfferReturnDelay.class);
+    rescheduleCalculator = createMock(RescheduleCalculator.class);
+    preemptor = createMock(Preemptor.class);
+    reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { });
+  }
+
+  private void replayAndCreateScheduler() {
+    control.replay();
+    offerManager = new OfferManagerImpl(driver, returnDelay, executor);
+    TaskScheduler scheduler = new TaskSchedulerImpl(storage,
+        stateManager,
+        assigner,
+        offerManager,
+        preemptor,
+        reservations);
+    taskGroups = new TaskGroups(
+        executor,
+        Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS),
+        retryStrategy,
+        RateLimiter.create(100),
+        scheduler,
+        rescheduleCalculator);
+  }
+
+  private Capture<Runnable> expectOffer() {
+    return expectOfferDeclineIn(10);
+  }
+
+  private Capture<Runnable> expectOfferDeclineIn(long delayMillis) {
+    expect(returnDelay.get()).andReturn(Amount.of(delayMillis, Time.MILLISECONDS));
+    Capture<Runnable> runnable = createCapture();
+    executor.schedule(capture(runnable), eq(delayMillis), eq(TimeUnit.MILLISECONDS));
+    expectLastCall().andReturn(createMock(ScheduledFuture.class));
+    return runnable;
+  }
+
+  private void changeState(
+      IScheduledTask task,
+      ScheduleStatus oldState,
+      ScheduleStatus newState) {
+
+    final IScheduledTask copy = IScheduledTask.build(task.newBuilder().setStatus(newState));
+    // Insert the task if it doesn't already exist.
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
+        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+        if (Iterables.isEmpty(taskStore.fetchTasks(Query.taskScoped(Tasks.id(copy))))) {
+          taskStore.saveTasks(ImmutableSet.of(copy));
+        }
+      }
+    });
+    taskGroups.taskChangedState(TaskStateChange.transition(copy, oldState));
+  }
+
+  private Capture<Runnable> expectTaskRetryIn(long penaltyMs) {
+    Capture<Runnable> capture = createCapture();
+    executor.schedule(
+        capture(capture),
+        eq(penaltyMs),
+        eq(TimeUnit.MILLISECONDS));
+    expectLastCall().andReturn(future);
+    return capture;
+  }
+
+  private Capture<Runnable> expectTaskGroupBackoff(long previousPenaltyMs, long nextPenaltyMs) {
+    expect(retryStrategy.calculateBackoffMs(previousPenaltyMs)).andReturn(nextPenaltyMs);
+    return expectTaskRetryIn(nextPenaltyMs);
+  }
+
+  @Test
+  public void testNoTasks() {
+    expectAnyMaintenanceCalls();
+    expectOfferDeclineIn(10);
+    expectOfferDeclineIn(10);
+
+    replayAndCreateScheduler();
+
+    offerManager.addOffer(OFFER_A);
+    offerManager.addOffer(OFFER_B);
+  }
+
+  @Test
+  public void testNoOffers() {
+    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+    expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
+    IScheduledTask task = createTask("a");
+    expectPreemptorCall(task.getAssignedTask());
+    expectReservationCheck(task);
+
+    replayAndCreateScheduler();
+
+    changeState(task, INIT, PENDING);
+    timeoutCapture.getValue().run();
+  }
+
+  private IScheduledTask createTask(String taskId) {
+    return createTask(taskId, null);
+  }
+
+  private IScheduledTask createTask(String taskId, @Nullable ScheduleStatus status) {
+    return setStatus(makeTask(taskId, TaskTestUtil.JOB), status);
+  }
+
+  private IScheduledTask setStatus(IScheduledTask task, @Nullable ScheduleStatus status) {
+    return IScheduledTask.build(task.newBuilder().setStatus(status));
+  }
+
+  @Test
+  public void testLoadFromStorage() {
+    final IScheduledTask a = createTask("a", KILLED);
+    final IScheduledTask b = createTask("b", PENDING);
+    final IScheduledTask c = createTask("c", RUNNING);
+
+    expect(rescheduleCalculator.getStartupScheduleDelayMs(b)).andReturn(10L);
+    expectTaskRetryIn(10);
+
+    replayAndCreateScheduler();
+
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider store) {
+        store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(a, b, c));
+      }
+    });
+    for (IScheduledTask task : ImmutableList.of(a, b, c)) {
+      taskGroups.taskChangedState(TaskStateChange.initialized(task));
+    }
+    changeState(c, RUNNING, FINISHED);
+  }
+
+  @Test
+  public void testTaskMissing() {
+    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+
+    replayAndCreateScheduler();
+
+    taskGroups.taskChangedState(TaskStateChange.transition(createTask("a", PENDING), INIT));
+    timeoutCapture.getValue().run();
+  }
+
+  private IExpectationSetters<Assignment> expectMaybeAssign(
+      HostOffer offer,
+      IScheduledTask task,
+      AttributeAggregate jobAggregate) {
+
+    return expect(assigner.maybeAssign(
+        EasyMock.anyObject(),
+        eq(offer),
+        eq(new ResourceRequest(task.getAssignedTask().getTask(), jobAggregate)),
+        eq(Tasks.id(task))));
+  }
+
+  private IExpectationSetters<?> expectNoReservation(String slaveId) {
+    return expect(reservations.get(slaveId)).andReturn(Optional.absent());
+  }
+
+  private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) {
+    return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
+        .andReturn(ImmutableSet.of());
+  }
+
+  @Test
+  public void testTaskAssigned() {
+    expectAnyMaintenanceCalls();
+    expectOfferDeclineIn(10);
+
+    IScheduledTask taskA = createTask("a", PENDING);
+    TaskInfo mesosTask = makeTaskInfo(taskA);
+
+    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+    expectNoReservation(SLAVE_A).times(2);
+    expectReservationCheck(taskA);
+    expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.failure());
+    expectPreemptorCall(taskA.getAssignedTask());
+
+    Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
+    expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTask));
+    driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
+
+    Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+    expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
+    IScheduledTask taskB = createTask("b");
+    expectReservationCheck(taskB);
+    expectPreemptorCall(taskB.getAssignedTask());
+
+    replayAndCreateScheduler();
+
+    offerManager.addOffer(OFFER_A);
+    changeState(taskA, INIT, PENDING);
+    timeoutCapture.getValue().run();
+    timeoutCapture2.getValue().run();
+
+    // Ensure the offer was consumed.
+    changeState(taskB, INIT, PENDING);
+    timeoutCapture3.getValue().run();
+  }
+
+  @Test
+  public void testDriverNotReady() {
+    IScheduledTask task = createTask("a", PENDING);
+    TaskInfo mesosTask = TaskInfo.newBuilder()
+        .setName(Tasks.id(task))
+        .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task)))
+        .setSlaveId(SlaveID.newBuilder().setValue("slaveId"))
+        .build();
+
+    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+    expectAnyMaintenanceCalls();
+    expectOfferDeclineIn(10);
+    expectNoReservation(SLAVE_A);
+    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
+    driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
+    expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
+    expect(stateManager.changeState(
+        EasyMock.anyObject(),
+        eq("a"),
+        eq(Optional.of(PENDING)),
+        eq(LOST),
+        eq(TaskSchedulerImpl.LAUNCH_FAILED_MSG)))
+        .andReturn(StateChangeResult.SUCCESS);
+
+    replayAndCreateScheduler();
+
+    changeState(task, INIT, PENDING);
+    offerManager.addOffer(OFFER_A);
+    timeoutCapture.getValue().run();
+  }
+
+  @Test
+  public void testStorageException() {
+    IScheduledTask task = createTask("a", PENDING);
+    TaskInfo mesosTask = TaskInfo.newBuilder()
+        .setName(Tasks.id(task))
+        .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task)))
+        .setSlaveId(SlaveID.newBuilder().setValue("slaveId"))
+        .build();
+
+    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+    expectAnyMaintenanceCalls();
+    expectOfferDeclineIn(10);
+    expectNoReservation(SLAVE_A).times(2);
+    expectMaybeAssign(OFFER_A, task, EMPTY).andThrow(new StorageException("Injected failure."));
+
+    Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
+    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
+    driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
+    expectLastCall();
+
+    replayAndCreateScheduler();
+
+    changeState(task, INIT, PENDING);
+    offerManager.addOffer(OFFER_A);
+    timeoutCapture.getValue().run();
+    timeoutCapture2.getValue().run();
+  }
+
+  @Test
+  public void testExpiration() {
+    IScheduledTask task = createTask("a", PENDING);
+
+    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+    Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
+    expectAnyMaintenanceCalls();
+    expectNoReservation(SLAVE_A);
+    expectReservationCheck(task).times(2);
+    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
+    Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
+    expectPreemptorCall(task.getAssignedTask());
+    driver.declineOffer(OFFER_A.getOffer().getId());
+    expectTaskGroupBackoff(10, 20);
+    expectPreemptorCall(task.getAssignedTask());
+
+    replayAndCreateScheduler();
+
+    changeState(task, INIT, PENDING);
+    offerManager.addOffer(OFFER_A);
+    timeoutCapture.getValue().run();
+    offerExpirationCapture.getValue().run();
+    timeoutCapture2.getValue().run();
+  }
+
+  @Test
+  public void testOneOfferPerSlave() {
+    expectAnyMaintenanceCalls();
+    Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
+
+    HostOffer offerAB = new HostOffer(
+        Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getOffer().getSlaveId()).build(),
+        IHostAttributes.build(new HostAttributes()));
+
+    driver.declineOffer(OFFER_A.getOffer().getId());
+    driver.declineOffer(offerAB.getOffer().getId());
+
+    replayAndCreateScheduler();
+
+    offerManager.addOffer(OFFER_A);
+    offerManager.addOffer(offerAB);
+    offerExpirationCapture.getValue().run();
+  }
+
+  @Test
+  public void testDontDeclineAcceptedOffer() throws OfferManager.LaunchException {
+    expectAnyMaintenanceCalls();
+    Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
+
+    Function<HostOffer, Assignment> offerAcceptor =
+        createMock(new Clazz<Function<HostOffer, Assignment>>() { });
+    final TaskInfo taskInfo = TaskInfo.getDefaultInstance();
+    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(taskInfo));
+    driver.launchTask(OFFER_A.getOffer().getId(), taskInfo);
+
+    replayAndCreateScheduler();
+
+    offerManager.addOffer(OFFER_A);
+    offerManager.launchFirst(offerAcceptor, TaskGroupKey.from(ITaskConfig.build(new TaskConfig())));
+    offerExpirationCapture.getValue().run();
+  }
+
+  @Test
+  public void testBasicMaintenancePreferences() {
+    expectOffer();
+    expectOffer();
+    expectOffer();
+    expectOffer();
+
+    IScheduledTask taskA = createTask("A", PENDING);
+    TaskInfo mesosTaskA = makeTaskInfo(taskA);
+    expectNoReservation(SLAVE_A);
+    expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA));
+    driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA);
+    Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+
+    IScheduledTask taskB = createTask("B", PENDING);
+    TaskInfo mesosTaskB = makeTaskInfo(taskB);
+    expectNoReservation(SLAVE_B);
+    expectMaybeAssign(OFFER_B, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB));
+    driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB);
+    Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+
+    replayAndCreateScheduler();
+
+    offerManager.addOffer(OFFER_D);
+    offerManager.addOffer(OFFER_C);
+    offerManager.addOffer(OFFER_B);
+    offerManager.addOffer(OFFER_A);
+
+    changeState(taskA, INIT, PENDING);
+    captureA.getValue().run();
+
+    changeState(taskB, INIT, PENDING);
+    captureB.getValue().run();
+  }
+
+  @Test
+  public void testChangingMaintenancePreferences() {
+    expectOffer();
+    expectOffer();
+    expectOffer();
+
+    IScheduledTask taskA = createTask("A", PENDING);
+    TaskInfo mesosTaskA = makeTaskInfo(taskA);
+    expectNoReservation(SLAVE_B);
+    expectMaybeAssign(OFFER_B, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA));
+    driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA);
+    Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+
+    IScheduledTask taskB = createTask("B", PENDING);
+    TaskInfo mesosTaskB = makeTaskInfo(taskB);
+    HostOffer updatedOfferC = new HostOffer(
+        OFFER_C.getOffer(),
+        IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE)));
+    expectNoReservation(SLAVE_C);
+    expectMaybeAssign(updatedOfferC, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB));
+    driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB);
+    Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+
+    replayAndCreateScheduler();
+
+    offerManager.addOffer(OFFER_A);
+    offerManager.addOffer(OFFER_B);
+    offerManager.addOffer(OFFER_C);
+
+    // Initially, we'd expect the offers to be consumed in order (A, B), with (C) unschedulable
+
+    // Expected order now (B), with (C, A) unschedulable
+    changeHostMaintenanceState(OFFER_A.getAttributes(), DRAINING);
+    changeState(taskA, INIT, PENDING);
+    captureA.getValue().run();
+
+    // Expected order now (C), with (A) unschedulable and (B) already consumed
+    changeHostMaintenanceState(OFFER_C.getAttributes(), NONE);
+    changeState(taskB, INIT, PENDING);
+    captureB.getValue().run();
+  }
+
+  private Capture<String> expectTaskScheduled(IScheduledTask task) {
+    TaskInfo mesosTask = makeTaskInfo(task);
+    Capture<String> taskId = createCapture();
+    expect(assigner.maybeAssign(
+        EasyMock.anyObject(),
+        EasyMock.anyObject(),
+        EasyMock.anyObject(),
+        capture(taskId))).andReturn(Assignment.success(mesosTask));
+    driver.launchTask(EasyMock.anyObject(), eq(mesosTask));
+    return taskId;
+  }
+
+  @Test
+  public void testResistsStarvation() {
+    // TODO(wfarner): This test requires intimate knowledge of the way futures are used inside
+    // TaskScheduler.  It's time to test using a real ScheduledExecutorService.
+
+    expectAnyMaintenanceCalls();
+
+    IScheduledTask jobA0 = setStatus(makeTask("a0", JobKeys.from("a", "b", "c")), PENDING);
+
+    ScheduledTask jobA1Builder = jobA0.newBuilder();
+    jobA1Builder.getAssignedTask().setTaskId("a1");
+    jobA1Builder.getAssignedTask().setInstanceId(1);
+    IScheduledTask jobA1 = IScheduledTask.build(jobA1Builder);
+
+    ScheduledTask jobA2Builder = jobA0.newBuilder();
+    jobA2Builder.getAssignedTask().setTaskId("a2");
+    jobA2Builder.getAssignedTask().setInstanceId(2);
+    IScheduledTask jobA2 = IScheduledTask.build(jobA2Builder);
+
+    IScheduledTask jobB0 = setStatus(makeTask("b0", JobKeys.from("d", "e", "f")), PENDING);
+
+    expectNoReservation(SLAVE_A);
+    expectNoReservation(SLAVE_B);
+
+    expectOfferDeclineIn(10);
+    expectOfferDeclineIn(10);
+    expectOfferDeclineIn(10);
+    expectOfferDeclineIn(10);
+
+    Capture<Runnable> timeoutA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+    Capture<Runnable> timeoutB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+
+    Capture<String> firstScheduled = expectTaskScheduled(jobA0);
+    Capture<String> secondScheduled = expectTaskScheduled(jobB0);
+
+    // Expect another watch of the task group for job A.
+    expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+
+    replayAndCreateScheduler();
+
+    offerManager.addOffer(OFFER_A);
+    offerManager.addOffer(OFFER_B);
+    offerManager.addOffer(OFFER_C);
+    offerManager.addOffer(OFFER_D);
+    changeState(jobA0, INIT, PENDING);
+    changeState(jobA1, INIT, PENDING);
+    changeState(jobA2, INIT, PENDING);
+    changeState(jobB0, INIT, PENDING);
+    timeoutA.getValue().run();
+    timeoutB.getValue().run();
+    assertEquals(
+        ImmutableSet.of(Tasks.id(jobA0), Tasks.id(jobB0)),
+        ImmutableSet.of(firstScheduled.getValue(), secondScheduled.getValue()));
+  }
+
+  @Test
+  public void testTaskDeleted() {
+    expectAnyMaintenanceCalls();
+    expectOfferDeclineIn(10);
+
+    final IScheduledTask task = createTask("a", PENDING);
+
+    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+    expectNoReservation(SLAVE_A);
+    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
+    expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20);
+    expectReservationCheck(task);
+    expectPreemptorCall(task.getAssignedTask());
+
+    replayAndCreateScheduler();
+
+    offerManager.addOffer(OFFER_A);
+    changeState(task, INIT, PENDING);
+    timeoutCapture.getValue().run();
+
+    // Ensure the offer was consumed.
+    changeState(task, INIT, PENDING);
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().deleteTasks(Tasks.ids(task));
+      }
+    });
+    taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task)));
+    timeoutCapture.getValue().run();
+  }
+
+  private TaskInfo makeTaskInfo(IScheduledTask task) {
+    return TaskInfo.newBuilder()
+        .setName(Tasks.id(task))
+        .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task)))
+        .setSlaveId(SlaveID.newBuilder().setValue("slave-id" + task.toString()))
+        .build();
+  }
+
+  private void expectAnyMaintenanceCalls() {
+    expect(maintenance.getMode(isA(String.class))).andReturn(NONE).anyTimes();
+  }
+
+  private void changeHostMaintenanceState(IHostAttributes attributes, MaintenanceMode mode) {
+    offerManager.hostAttributesChanged(new PubsubEvent.HostAttributesChanged(
+        IHostAttributes.build(attributes.newBuilder().setMode(mode))));
+  }
+
+  private static HostOffer makeOffer(String offerId, String hostName, MaintenanceMode mode) {
+    Offer offer = Offers.makeOffer(offerId, hostName);
+    return new HostOffer(
+        offer,
+        IHostAttributes.build(new HostAttributes()
+            .setHost(hostName)
+            .setSlaveId(offer.getSlaveId().getValue())
+            .setAttributes(ImmutableSet.of())
+            .setMode(mode)));
+  }
+
+  private void expectPreemptorCall(IAssignedTask task) {
+    expect(preemptor.attemptPreemptionFor(
+        eq(task),
+        eq(EMPTY),
+        EasyMock.anyObject())).andReturn(Optional.absent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
new file mode 100644
index 0000000..4055021
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.state.StateChangeResult;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+
+public class TaskThrottlerTest extends EasyMockTest {
+
+  private RescheduleCalculator rescheduleCalculator;
+  private FakeClock clock;
+  private ScheduledExecutorService executor;
+  private StorageTestUtil storageUtil;
+  private StateManager stateManager;
+  private TaskThrottler throttler;
+
+  @Before
+  public void setUp() throws Exception {
+    rescheduleCalculator = createMock(RescheduleCalculator.class);
+    clock = new FakeClock();
+    executor = createMock(ScheduledExecutorService.class);
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    stateManager = createMock(StateManager.class);
+    throttler = new TaskThrottler(
+        rescheduleCalculator,
+        clock,
+        executor,
+        storageUtil.storage,
+        stateManager);
+  }
+
+  @Test
+  public void testIgnoresNonThrottledTasks() {
+    control.replay();
+
+    throttler.taskChangedState(TaskStateChange.transition(makeTask("a", PENDING), INIT));
+    throttler.taskChangedState(TaskStateChange.transition(makeTask("a", RUNNING), PENDING));
+  }
+
+  @Test
+  public void testThrottledTask() {
+    IScheduledTask task = makeTask("a", THROTTLED);
+
+    long penaltyMs = 100;
+
+    expect(rescheduleCalculator.getFlappingPenaltyMs(task)).andReturn(penaltyMs);
+    Capture<Runnable> stateChangeCapture = expectThrottled(penaltyMs);
+    expectMovedToPending(task);
+
+    control.replay();
+
+    throttler.taskChangedState(TaskStateChange.transition(task, INIT));
+    stateChangeCapture.getValue().run();
+  }
+
+  @Test
+  public void testThrottledTaskReady() {
+    // Ensures that a sane delay is used when the task's penalty was already expired when
+    // the -> THROTTLED transition occurred (such as in the event of a scheduler failover).
+
+    IScheduledTask task = makeTask("a", THROTTLED);
+
+    long penaltyMs = 100;
+
+    expect(rescheduleCalculator.getFlappingPenaltyMs(task)).andReturn(penaltyMs);
+    Capture<Runnable> stateChangeCapture = expectThrottled(0);
+    expectMovedToPending(task);
+
+    control.replay();
+
+    clock.advance(Amount.of(1L, Time.HOURS));
+    throttler.taskChangedState(TaskStateChange.transition(task, INIT));
+    stateChangeCapture.getValue().run();
+  }
+
+  private Capture<Runnable> expectThrottled(long penaltyMs) {
+    Capture<Runnable> stateChangeCapture = createCapture();
+    expect(executor.schedule(
+        capture(stateChangeCapture),
+        eq(penaltyMs),
+        eq(TimeUnit.MILLISECONDS)))
+        .andReturn(null);
+    return stateChangeCapture;
+  }
+
+  private void expectMovedToPending(IScheduledTask task) {
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        Tasks.id(task),
+        Optional.of(THROTTLED),
+        PENDING,
+        Optional.absent()))
+        .andReturn(StateChangeResult.SUCCESS);
+  }
+
+  private IScheduledTask makeTask(String id, ScheduleStatus status) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setTaskEvents(ImmutableList.of(
+            new TaskEvent()
+                .setStatus(status)
+                .setTimestamp(clock.nowMillis())))
+        .setStatus(status)
+        .setAssignedTask(new AssignedTask().setTaskId(id)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index dba1945..262f668 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -36,7 +36,6 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.TaskIdGenerator;
-import org.apache.aurora.scheduler.async.RescheduleCalculator;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.base.Tasks;
@@ -45,6 +44,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.db.DbUtil;

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 8b99e0f..f5e1dd0 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -58,8 +58,6 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
-import org.apache.aurora.scheduler.async.RescheduleCalculator;
-import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
@@ -67,6 +65,8 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
+import org.apache.aurora.scheduler.scheduling.RescheduleCalculator.RescheduleCalculatorImpl;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManagerImpl;
 import org.apache.aurora.scheduler.state.StateChangeResult;


Mime
View raw message