aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [4/8] aurora git commit: Break apart async package and AsyncModule into purpose-specific equivalents.
Date Wed, 22 Jul 2015 19:40:01 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelayTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelayTest.java b/src/test/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelayTest.java
deleted file mode 100644
index 1aed40c..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelayTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.Random;
-
-import org.junit.Test;
-
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class RandomJitterReturnDelayTest extends EasyMockTest {
-  private void assertRandomJitterReturnDelay(
-      int minHoldTimeMs,
-      int jitterWindowMs,
-      boolean shouldThrow) {
-
-    int randomValue = 123;
-
-    Random mockRandom = control.createMock(Random.class);
-
-    if (!shouldThrow) {
-      expect(mockRandom.nextInt(jitterWindowMs)).andReturn(randomValue);
-    }
-
-    control.replay();
-
-    assertEquals(
-        minHoldTimeMs + randomValue,
-        new RandomJitterReturnDelay(
-            minHoldTimeMs,
-            jitterWindowMs,
-            mockRandom).get().getValue().intValue());
-  }
-
-  @Test
-  public void testRandomJitterReturnDelay() throws Exception {
-    assertRandomJitterReturnDelay(100, 200, false);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testNegativeHoldTimeThrowsIllegalArgumentException() throws Exception {
-    assertRandomJitterReturnDelay(-1, 200, true);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testNegativeWindowThrowsIllegalArgumentException() throws Exception {
-    assertRandomJitterReturnDelay(100, -1, true);
-  }
-
-  @Test
-  public void testZeroHoldTime() throws Exception {
-    assertRandomJitterReturnDelay(0, 200, false);
-  }
-
-  @Test
-  public void testZeroWindow() throws Exception {
-    assertRandomJitterReturnDelay(100, 0, false);
-  }
-
-  @Test
-  public void testZeroHoldTimeZeroWindow() throws Exception {
-    assertRandomJitterReturnDelay(0, 0, false);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java
deleted file mode 100644
index 131bd82..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/RescheduleCalculatorImplTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.Map;
-import java.util.Map.Entry;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.BackoffStrategy;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
-import static org.apache.aurora.gen.ScheduleStatus.INIT;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLING;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class RescheduleCalculatorImplTest extends EasyMockTest {
-
-  private static final Amount<Long, Time> FLAPPING_THRESHOLD = Amount.of(1L, Time.MINUTES);
-  private static final Amount<Integer, Time> MAX_STARTUP_DELAY = Amount.of(10, Time.MINUTES);
-
-  private StorageTestUtil storageUtil;
-  private BackoffStrategy backoff;
-  private RescheduleCalculator rescheduleCalculator;
-
-  @Before
-  public void setUp() {
-    storageUtil = new StorageTestUtil(this);
-    backoff = createMock(BackoffStrategy.class);
-    rescheduleCalculator = new RescheduleCalculatorImpl(
-        storageUtil.storage,
-        new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
-            backoff,
-            FLAPPING_THRESHOLD,
-            MAX_STARTUP_DELAY));
-    storageUtil.expectOperations();
-  }
-
-  @Test
-  public void testNoPenaltyForNoAncestor() {
-    control.replay();
-
-    assertEquals(0L, rescheduleCalculator.getFlappingPenaltyMs(makeTask("a", INIT)));
-  }
-
-  @Test
-  public void testNoPenaltyDeletedAncestor() {
-    String ancestorId = "a";
-    storageUtil.expectTaskFetch(Query.taskScoped(ancestorId));
-
-    control.replay();
-
-    assertEquals(
-        0L,
-        rescheduleCalculator.getFlappingPenaltyMs(setAncestor(makeTask("b", INIT), ancestorId)));
-  }
-
-  @Test
-  public void testFlappingTask() {
-    IScheduledTask ancestor = makeFlappyTask("a");
-    storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor);
-    long penaltyMs = 1000L;
-    expect(backoff.calculateBackoffMs(0L)).andReturn(penaltyMs);
-
-    control.replay();
-
-    assertEquals(
-        penaltyMs,
-        rescheduleCalculator.getFlappingPenaltyMs(
-            setAncestor(makeTask("b", INIT), Tasks.id(ancestor))));
-  }
-
-  @Test
-  public void testFlappingTasksBackoffTruncation() {
-    // Ensures that the reschedule calculator detects penalty truncation and avoids inspecting
-    // ancestors once truncated.
-    IScheduledTask taskA = setAncestor(makeFlappyTask("a"), "bugIfQueried");
-    IScheduledTask taskB = setAncestor(makeFlappyTask("b"), Tasks.id(taskA));
-    IScheduledTask taskC = setAncestor(makeFlappyTask("c"), Tasks.id(taskB));
-    IScheduledTask taskD = setAncestor(makeFlappyTask("d"), Tasks.id(taskC));
-
-    Map<IScheduledTask, Long> ancestorsAndPenalties = ImmutableMap.of(
-        taskD, 100L,
-        taskC, 200L,
-        taskB, 300L,
-        taskA, 300L);
-
-    long lastPenalty = 0L;
-    for (Map.Entry<IScheduledTask, Long> taskAndPenalty : ancestorsAndPenalties.entrySet()) {
-      storageUtil.expectTaskFetch(
-          Query.taskScoped(Tasks.id(taskAndPenalty.getKey())),
-          taskAndPenalty.getKey());
-      expect(backoff.calculateBackoffMs(lastPenalty)).andReturn(taskAndPenalty.getValue());
-      lastPenalty = taskAndPenalty.getValue();
-    }
-
-    control.replay();
-
-    IScheduledTask newTask = setAncestor(makeFlappyTask("newTask"), Tasks.id(taskD));
-    assertEquals(300L, rescheduleCalculator.getFlappingPenaltyMs(newTask));
-  }
-
-  @Test
-  public void testNoPenaltyForInterruptedTasks() {
-    IScheduledTask ancestor = setEvents(
-        makeTask("a", KILLED),
-        ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, KILLING, 300L, KILLED, 400L));
-    storageUtil.expectTaskFetch(Query.taskScoped(Tasks.id(ancestor)), ancestor);
-
-    control.replay();
-
-    assertEquals(
-        0L,
-        rescheduleCalculator.getFlappingPenaltyMs(
-            setAncestor(makeTask("b", INIT), Tasks.id(ancestor))));
-  }
-
-  private IScheduledTask makeFlappyTask(String taskId) {
-    return setEvents(
-        makeTask(taskId, FINISHED),
-        ImmutableMap.of(INIT, 0L, PENDING, 100L, RUNNING, 200L, FINISHED, 300L));
-  }
-
-  private IScheduledTask makeTask(String taskId) {
-    return IScheduledTask.build(new ScheduledTask()
-        .setAssignedTask(new AssignedTask()
-            .setInstanceId(0)
-            .setTaskId(taskId)
-            .setTask(new TaskConfig()
-                .setJobName("job-" + taskId)
-                .setOwner(new Identity().setRole("role-" + taskId).setUser("user-" + taskId))
-                .setEnvironment("env-" + taskId))));
-  }
-
-  private IScheduledTask makeTask(String taskId, ScheduleStatus status) {
-    return IScheduledTask.build(makeTask(taskId).newBuilder().setStatus(status));
-  }
-
-  private IScheduledTask setAncestor(IScheduledTask task, String ancestorId) {
-    return IScheduledTask.build(task.newBuilder().setAncestorId(ancestorId));
-  }
-
-  private static final Function<Map.Entry<ScheduleStatus, Long>, TaskEvent> TO_EVENT =
-      new Function<Entry<ScheduleStatus, Long>, TaskEvent>() {
-        @Override
-        public TaskEvent apply(Entry<ScheduleStatus, Long> input) {
-          return new TaskEvent().setStatus(input.getKey()).setTimestamp(input.getValue());
-        }
-      };
-
-  private IScheduledTask setEvents(IScheduledTask task, Map<ScheduleStatus, Long> events) {
-    return IScheduledTask.build(task.newBuilder().setTaskEvents(
-        FluentIterable.from(events.entrySet()).transform(TO_EVENT).toList()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
deleted file mode 100644
index 51256f4..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskGroupsTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.RateLimiter;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.BackoffStrategy;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Before;
-import org.junit.Test;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import static org.apache.aurora.gen.ScheduleStatus.INIT;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-
-public class TaskGroupsTest extends EasyMockTest {
-
-  private static final long FIRST_SCHEDULE_DELAY_MS = 1L;
-
-  private ScheduledExecutorService executor;
-  private BackoffStrategy backoffStrategy;
-  private TaskScheduler taskScheduler;
-  private RateLimiter rateLimiter;
-
-  private TaskGroups taskGroups;
-
-  @Before
-  public void setUp() throws Exception {
-    executor = createMock(ScheduledExecutorService.class);
-    backoffStrategy = createMock(BackoffStrategy.class);
-    taskScheduler = createMock(TaskScheduler.class);
-    rateLimiter = createMock(RateLimiter.class);
-    taskGroups = new TaskGroups(
-        executor,
-        Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS),
-        backoffStrategy,
-        rateLimiter,
-        taskScheduler,
-        createMock(RescheduleCalculator.class));
-  }
-
-  @Test
-  public void testEvaluatedAfterFirstSchedulePenalty() {
-    executor.schedule(
-        EasyMock.<Runnable>anyObject(),
-        EasyMock.eq(FIRST_SCHEDULE_DELAY_MS),
-        EasyMock.eq(MILLISECONDS));
-    expectLastCall().andAnswer(new IAnswer<ScheduledFuture<Void>>() {
-      @Override
-      public ScheduledFuture<Void> answer() {
-        ((Runnable) EasyMock.getCurrentArguments()[0]).run();
-        return null;
-      }
-    });
-    expect(rateLimiter.acquire()).andReturn(0D);
-    expect(taskScheduler.schedule("a")).andReturn(true);
-
-    control.replay();
-
-    taskGroups.taskChangedState(TaskStateChange.transition(makeTask("a"), INIT));
-  }
-
-  private Capture<Runnable> expectEvaluate() {
-    Capture<Runnable> capture = createCapture();
-    executor.schedule(
-        EasyMock.capture(capture),
-        EasyMock.eq(FIRST_SCHEDULE_DELAY_MS),
-        EasyMock.eq(MILLISECONDS));
-    expectLastCall().andReturn(null);
-    return capture;
-  }
-
-  @Test
-  public void testTaskDeletedBeforeEvaluating() {
-    final IScheduledTask task = makeTask("a");
-
-    Capture<Runnable> evaluate = expectEvaluate();
-
-    expect(rateLimiter.acquire()).andReturn(0D);
-    expect(taskScheduler.schedule(Tasks.id(task))).andAnswer(new IAnswer<Boolean>() {
-      @Override
-      public Boolean answer() {
-        // Test a corner case where a task is deleted while it is being evaluated by the task
-        // scheduler.  If not handled carefully, this could result in the scheduler trying again
-        // later to satisfy the deleted task.
-        taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task)));
-
-        return false;
-      }
-    });
-    expect(backoffStrategy.calculateBackoffMs(FIRST_SCHEDULE_DELAY_MS)).andReturn(0L);
-
-    control.replay();
-
-    taskGroups.taskChangedState(TaskStateChange.transition(makeTask(Tasks.id(task)), INIT));
-    evaluate.getValue().run();
-  }
-
-  private static IScheduledTask makeTask(String id) {
-    return IScheduledTask.build(new ScheduledTask()
-        .setStatus(ScheduleStatus.PENDING)
-        .setAssignedTask(new AssignedTask()
-            .setTaskId(id)
-            .setTask(new TaskConfig()
-                .setOwner(new Identity("owner", "owner"))
-                .setEnvironment("test")
-                .setJobName("job"))));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
deleted file mode 100644
index 6eaf3ce..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.base.Command;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.ExecutorConfig;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSettings;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
-import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.STARTING;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.fail;
-
-public class TaskHistoryPrunerTest extends EasyMockTest {
-  private static final String JOB_A = "job-a";
-  private static final String TASK_ID = "task_id";
-  private static final String SLAVE_HOST = "HOST_A";
-  private static final Amount<Long, Time> ONE_MS = Amount.of(1L, Time.MILLISECONDS);
-  private static final Amount<Long, Time> ONE_MINUTE = Amount.of(1L, Time.MINUTES);
-  private static final Amount<Long, Time> ONE_DAY = Amount.of(1L, Time.DAYS);
-  private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS);
-  private static final int PER_JOB_HISTORY = 2;
-
-  private ScheduledFuture<?> future;
-  private ScheduledExecutorService executor;
-  private FakeClock clock;
-  private StateManager stateManager;
-  private StorageTestUtil storageUtil;
-  private TaskHistoryPruner pruner;
-
-  @Before
-  public void setUp() {
-    future = createMock(new Clazz<ScheduledFuture<?>>() { });
-    executor = createMock(ScheduledExecutorService.class);
-    clock = new FakeClock();
-    stateManager = createMock(StateManager.class);
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-    pruner = new TaskHistoryPruner(
-        executor,
-        stateManager,
-        clock,
-        new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
-        storageUtil.storage);
-  }
-
-  @Test
-  public void testNoPruning() {
-    long taskATimestamp = clock.nowMillis();
-    IScheduledTask a = makeTask("a", FINISHED);
-
-    clock.advance(ONE_MS);
-    long taskBTimestamp = clock.nowMillis();
-    IScheduledTask b = makeTask("b", LOST);
-
-    expectNoImmediatePrune(ImmutableSet.of(a));
-    expectOneDelayedPrune(taskATimestamp);
-    expectNoImmediatePrune(ImmutableSet.of(a, b));
-    expectOneDelayedPrune(taskBTimestamp);
-
-    control.replay();
-
-    pruner.recordStateChange(TaskStateChange.initialized(a));
-    pruner.recordStateChange(TaskStateChange.initialized(b));
-  }
-
-  @Test
-  public void testStorageStartedWithPruning() {
-    long taskATimestamp = clock.nowMillis();
-    IScheduledTask a = makeTask("a", FINISHED);
-
-    clock.advance(ONE_MINUTE);
-    long taskBTimestamp = clock.nowMillis();
-    IScheduledTask b = makeTask("b", LOST);
-
-    clock.advance(ONE_MINUTE);
-    long taskCTimestamp = clock.nowMillis();
-    IScheduledTask c = makeTask("c", FINISHED);
-
-    clock.advance(ONE_MINUTE);
-    IScheduledTask d = makeTask("d", FINISHED);
-    IScheduledTask e = makeTask("job-x", "e", FINISHED);
-
-    expectNoImmediatePrune(ImmutableSet.of(a));
-    expectOneDelayedPrune(taskATimestamp);
-    expectNoImmediatePrune(ImmutableSet.of(a, b));
-    expectOneDelayedPrune(taskBTimestamp);
-    expectImmediatePrune(ImmutableSet.of(a, b, c), a);
-    expectOneDelayedPrune(taskCTimestamp);
-    expectImmediatePrune(ImmutableSet.of(b, c, d), b);
-    expectDefaultDelayedPrune();
-    expectNoImmediatePrune(ImmutableSet.of(e));
-    expectDefaultDelayedPrune();
-
-    control.replay();
-
-    for (IScheduledTask task : ImmutableList.of(a, b, c, d, e)) {
-      pruner.recordStateChange(TaskStateChange.initialized(task));
-    }
-  }
-
-  @Test
-  public void testStateChange() {
-    IScheduledTask starting = makeTask("a", STARTING);
-    IScheduledTask running = copy(starting, RUNNING);
-    IScheduledTask killed = copy(starting, KILLED);
-
-    expectNoImmediatePrune(ImmutableSet.of(killed));
-    expectDefaultDelayedPrune();
-
-    control.replay();
-
-    // No future set for non-terminal state transition.
-    changeState(starting, running);
-
-    // Future set for terminal state transition.
-    changeState(running, killed);
-  }
-
-  @Test
-  public void testActivateFutureAndExceedHistoryGoal() {
-    IScheduledTask running = makeTask("a", RUNNING);
-    IScheduledTask killed = copy(running, KILLED);
-    expectNoImmediatePrune(ImmutableSet.of(running));
-    Capture<Runnable> delayedDelete = expectDefaultDelayedPrune();
-
-    // Expect task "a" to be pruned when future is activated.
-    expectDeleteTasks("a");
-
-    control.replay();
-
-    // Capture future for inactive task "a"
-    changeState(running, killed);
-    clock.advance(ONE_HOUR);
-    // Execute future to prune task "a" from the system.
-    delayedDelete.getValue().run();
-  }
-
-  @Test
-  public void testJobHistoryExceeded() {
-    IScheduledTask a = makeTask("a", RUNNING);
-    clock.advance(ONE_MS);
-    IScheduledTask aKilled = copy(a, KILLED);
-
-    IScheduledTask b = makeTask("b", RUNNING);
-    clock.advance(ONE_MS);
-    IScheduledTask bKilled = copy(b, KILLED);
-
-    IScheduledTask c = makeTask("c", RUNNING);
-    clock.advance(ONE_MS);
-    IScheduledTask cLost = copy(c, LOST);
-
-    IScheduledTask d = makeTask("d", RUNNING);
-    clock.advance(ONE_MS);
-    IScheduledTask dLost = copy(d, LOST);
-
-    expectNoImmediatePrune(ImmutableSet.of(a));
-    expectDefaultDelayedPrune();
-    expectNoImmediatePrune(ImmutableSet.of(a, b));
-    expectDefaultDelayedPrune();
-    expectNoImmediatePrune(ImmutableSet.of(a, b)); // no pruning yet due to min threshold
-    expectDefaultDelayedPrune();
-    clock.advance(ONE_HOUR);
-    expectImmediatePrune(ImmutableSet.of(a, b, c, d), a, b); // now prune 2 tasks
-    expectDefaultDelayedPrune();
-
-    control.replay();
-
-    changeState(a, aKilled);
-    changeState(b, bKilled);
-    changeState(c, cLost);
-    changeState(d, dLost);
-  }
-
-  // TODO(William Farner): Consider removing the thread safety tests.  Now that intrinsic locks
-  // are not used, it is rather awkward to test this.
-  @Test
-  public void testThreadSafeStateChangeEvent() throws Exception {
-    // This tests against regression where an executor pruning a task holds an intrinsic lock and
-    // an unrelated task state change in the scheduler fires an event that requires this intrinsic
-    // lock. This causes a deadlock when the executor tries to acquire a lock held by the event
-    // fired.
-
-    pruner = prunerWithRealExecutor();
-    Command onDeleted = new Command() {
-      @Override
-      public void execute() {
-        // The goal is to verify that the call does not deadlock. We do not care about the outcome.
-        IScheduledTask b = makeTask("b", ASSIGNED);
-
-        changeState(b, STARTING);
-      }
-    };
-    CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
-
-    control.replay();
-
-    // Change the task to a terminal state and wait for it to be pruned.
-    changeState(makeTask(TASK_ID, RUNNING), KILLED);
-    taskDeleted.await();
-  }
-
-  private TaskHistoryPruner prunerWithRealExecutor() {
-    ScheduledExecutorService realExecutor = Executors.newScheduledThreadPool(1,
-        new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat("testThreadSafeEvents-executor")
-            .build());
-    return new TaskHistoryPruner(
-        realExecutor,
-        stateManager,
-        clock,
-        new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY),
-        storageUtil.storage);
-  }
-
-  private CountDownLatch expectTaskDeleted(final Command onDelete, String taskId) {
-    final CountDownLatch deleteCalled = new CountDownLatch(1);
-    final CountDownLatch eventDelivered = new CountDownLatch(1);
-
-    Thread eventDispatch = new Thread() {
-      @Override
-      public void run() {
-        try {
-          deleteCalled.await();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          fail("Interrupted while awaiting for delete call.");
-          return;
-        }
-        onDelete.execute();
-        eventDelivered.countDown();
-      }
-    };
-    eventDispatch.setDaemon(true);
-    eventDispatch.setName(getClass().getName() + "-EventDispatch");
-    eventDispatch.start();
-
-    stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.of(taskId));
-    expectLastCall().andAnswer(new IAnswer<Void>() {
-      @Override
-      public Void answer() {
-        deleteCalled.countDown();
-        try {
-          eventDelivered.await();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          fail("Interrupted while awaiting for event delivery.");
-        }
-        return null;
-      }
-    });
-
-    return eventDelivered;
-  }
-
-  private void expectDeleteTasks(String... tasks) {
-    stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.copyOf(tasks));
-  }
-
-  private Capture<Runnable> expectDefaultDelayedPrune() {
-    return expectDelayedPrune(ONE_DAY.as(Time.MILLISECONDS), 1);
-  }
-
-  private Capture<Runnable> expectOneDelayedPrune(long timestampMillis) {
-    return expectDelayedPrune(timestampMillis, 1);
-  }
-
-  private void expectNoImmediatePrune(ImmutableSet<IScheduledTask> tasksInJob) {
-    expectImmediatePrune(tasksInJob);
-  }
-
-  private void expectImmediatePrune(
-      ImmutableSet<IScheduledTask> tasksInJob,
-      IScheduledTask... pruned) {
-
-    // Expect a deferred prune operation when a new task is being watched.
-    executor.submit(EasyMock.<Runnable>anyObject());
-    expectLastCall().andAnswer(
-        new IAnswer<Future<?>>() {
-          @Override
-          public Future<?> answer() {
-            Runnable work = (Runnable) EasyMock.getCurrentArguments()[0];
-            work.run();
-            return null;
-          }
-        }
-    );
-
-    IJobKey jobKey = Iterables.getOnlyElement(
-        FluentIterable.from(tasksInJob).transform(Tasks.SCHEDULED_TO_JOB_KEY).toSet());
-    storageUtil.expectTaskFetch(TaskHistoryPruner.jobHistoryQuery(jobKey), tasksInJob);
-    if (pruned.length > 0) {
-      stateManager.deleteTasks(storageUtil.mutableStoreProvider, Tasks.ids(pruned));
-    }
-  }
-
-  private Capture<Runnable> expectDelayedPrune(long timestampMillis, int count) {
-    Capture<Runnable> capture = createCapture();
-    executor.schedule(
-        EasyMock.capture(capture),
-        eq(pruner.calculateTimeout(timestampMillis)),
-        eq(TimeUnit.MILLISECONDS));
-    expectLastCall().andReturn(future).times(count);
-    return capture;
-  }
-
-  private void changeState(IScheduledTask oldStateTask, IScheduledTask newStateTask) {
-    pruner.recordStateChange(TaskStateChange.transition(newStateTask, oldStateTask.getStatus()));
-  }
-
-  private void changeState(IScheduledTask oldStateTask, ScheduleStatus status) {
-    pruner.recordStateChange(
-        TaskStateChange.transition(copy(oldStateTask, status), oldStateTask.getStatus()));
-  }
-
-  private IScheduledTask copy(IScheduledTask task, ScheduleStatus status) {
-    return IScheduledTask.build(task.newBuilder().setStatus(status));
-  }
-
-  private IScheduledTask makeTask(
-      String job,
-      String taskId,
-      ScheduleStatus status) {
-
-    return IScheduledTask.build(new ScheduledTask()
-        .setStatus(status)
-        .setTaskEvents(ImmutableList.of(new TaskEvent(clock.nowMillis(), status)))
-        .setAssignedTask(makeAssignedTask(job, taskId)));
-  }
-
-  private IScheduledTask makeTask(String taskId, ScheduleStatus status) {
-    return makeTask(JOB_A, taskId, status);
-  }
-
-  private AssignedTask makeAssignedTask(String job, String taskId) {
-    return new AssignedTask()
-        .setSlaveHost(SLAVE_HOST)
-        .setTaskId(taskId)
-        .setTask(new TaskConfig()
-            .setJob(new JobKey("role", "staging45", job))
-            .setOwner(new Identity().setRole("role").setUser("user"))
-            .setEnvironment("staging45")
-            .setJobName(job)
-            .setExecutorConfig(new ExecutorConfig("aurora", "config")));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java
deleted file mode 100644
index 0011412..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
-import org.junit.Before;
-import org.junit.Test;
-
-import static com.twitter.common.quantity.Time.MINUTES;
-
-import static org.apache.aurora.scheduler.async.TaskReconciler.EXPLICIT_STAT_NAME;
-import static org.apache.aurora.scheduler.async.TaskReconciler.IMPLICIT_STAT_NAME;
-import static org.apache.aurora.scheduler.async.TaskReconciler.TASK_TO_PROTO;
-import static org.apache.aurora.scheduler.async.TaskReconciler.TaskReconcilerSettings;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-
-public class TaskReconcilerTest extends EasyMockTest {
-  private static final Amount<Long, Time> INITIAL_DELAY = Amount.of(10L, MINUTES);
-  private static final Amount<Long, Time> EXPLICIT_SCHEDULE = Amount.of(60L, MINUTES);
-  private static final Amount<Long, Time> IMPLICT_SCHEDULE = Amount.of(180L, MINUTES);
-  private static final Amount<Long, Time> SPREAD = Amount.of(30L, MINUTES);
-  private static final TaskReconcilerSettings SETTINGS = new TaskReconcilerSettings(
-      INITIAL_DELAY,
-      EXPLICIT_SCHEDULE,
-      IMPLICT_SCHEDULE,
-      SPREAD);
-
-  private StorageTestUtil storageUtil;
-  private StatsProvider statsProvider;
-  private Driver driver;
-  private ScheduledExecutorService executorService;
-  private FakeScheduledExecutor clock;
-  private AtomicLong explicitRuns;
-  private AtomicLong implicitRuns;
-
-  @Before
-  public void setUp() {
-    storageUtil = new StorageTestUtil(this);
-    statsProvider = createMock(StatsProvider.class);
-    driver = createMock(Driver.class);
-    executorService = createMock(ScheduledExecutorService.class);
-    explicitRuns = new AtomicLong();
-    implicitRuns = new AtomicLong();
-  }
-
-  @Test
-  public void testExecution() {
-    expect(statsProvider.makeCounter(EXPLICIT_STAT_NAME)).andReturn(explicitRuns);
-    expect(statsProvider.makeCounter(IMPLICIT_STAT_NAME)).andReturn(implicitRuns);
-    clock = FakeScheduledExecutor.scheduleAtFixedRateExecutor(executorService, 2, 5);
-
-    IScheduledTask task = TaskTestUtil.makeTask("id1", TaskTestUtil.JOB);
-    storageUtil.expectOperations();
-    storageUtil.expectTaskFetch(Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES), task)
-        .times(5);
-
-    driver.reconcileTasks(ImmutableSet.of(TASK_TO_PROTO.apply(task)));
-    expectLastCall().times(5);
-
-    driver.reconcileTasks(ImmutableSet.of());
-    expectLastCall().times(2);
-
-    control.replay();
-
-    TaskReconciler reconciler = new TaskReconciler(
-        SETTINGS,
-        storageUtil.storage,
-        driver,
-        executorService,
-        statsProvider);
-
-    reconciler.startAsync().awaitRunning();
-
-    clock.advance(INITIAL_DELAY);
-    assertEquals(1L, explicitRuns.get());
-    assertEquals(0L, implicitRuns.get());
-
-    clock.advance(SPREAD);
-    assertEquals(1L, explicitRuns.get());
-    assertEquals(1L, implicitRuns.get());
-
-    clock.advance(EXPLICIT_SCHEDULE);
-    assertEquals(2L, explicitRuns.get());
-    assertEquals(1L, implicitRuns.get());
-
-    clock.advance(IMPLICT_SCHEDULE);
-    assertEquals(5L, explicitRuns.get());
-    assertEquals(2L, implicitRuns.get());
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testInvalidImplicitDelay() throws Exception {
-    control.replay();
-
-    new TaskReconcilerSettings(
-        INITIAL_DELAY,
-        EXPLICIT_SCHEDULE,
-        IMPLICT_SCHEDULE,
-        Amount.of(Long.MAX_VALUE, MINUTES));
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testInvalidExplicitDelay() throws Exception {
-    control.replay();
-
-    new TaskReconcilerSettings(
-        Amount.of(Long.MAX_VALUE, MINUTES),
-        EXPLICIT_SCHEDULE,
-        IMPLICT_SCHEDULE,
-        SPREAD);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
deleted file mode 100644
index 45adb2e..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.TypeLiteral;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
-import org.apache.aurora.scheduler.async.preemptor.BiCache;
-import org.apache.aurora.scheduler.async.preemptor.Preemptor;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.state.PubsubTestUtil;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.mesos.Protos.TaskInfo;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class TaskSchedulerImplTest extends EasyMockTest {
-
-  private static final IScheduledTask TASK_A =
-      TaskTestUtil.makeTask("a", JobKeys.from("a", "a", "a"));
-  private static final IScheduledTask TASK_B =
-      TaskTestUtil.makeTask("b", JobKeys.from("b", "b", "b"));
-  private static final HostOffer OFFER = new HostOffer(
-      Offers.makeOffer("OFFER_A", "HOST_A"),
-      IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
-
-  private static final String SLAVE_ID = OFFER.getOffer().getSlaveId().getValue();
-
-  private static final TaskGroupKey GROUP_A = TaskGroupKey.from(TASK_A.getAssignedTask().getTask());
-  private static final TaskGroupKey GROUP_B = TaskGroupKey.from(TASK_B.getAssignedTask().getTask());
-
-  private StorageTestUtil storageUtil;
-  private StateManager stateManager;
-  private TaskAssigner assigner;
-  private OfferManager offerManager;
-  private TaskScheduler scheduler;
-  private Preemptor preemptor;
-  private BiCache<String, TaskGroupKey> reservations;
-  private EventSink eventSink;
-
-  @Before
-  public void setUp() throws Exception {
-    storageUtil = new StorageTestUtil(this);
-    stateManager = createMock(StateManager.class);
-    assigner = createMock(TaskAssigner.class);
-    offerManager = createMock(OfferManager.class);
-    preemptor = createMock(Preemptor.class);
-    reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { });
-
-    Injector injector = getInjector(storageUtil.storage);
-    scheduler = injector.getInstance(TaskScheduler.class);
-    eventSink = PubsubTestUtil.startPubsub(injector);
-  }
-
-  private Injector getInjector(final Storage storageImpl) {
-    return Guice.createInjector(
-        new PubsubEventModule(false),
-        new AbstractModule() {
-          @Override
-          protected void configure() {
-            bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).toInstance(reservations);
-            bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
-            bind(Preemptor.class).toInstance(preemptor);
-            bind(OfferManager.class).toInstance(offerManager);
-            bind(StateManager.class).toInstance(stateManager);
-            bind(TaskAssigner.class).toInstance(assigner);
-            bind(Clock.class).toInstance(createMock(Clock.class));
-            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
-            bind(Storage.class).toInstance(storageImpl);
-            PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class);
-          }
-        });
-  }
-
-  private void expectTaskStillPendingQuery(IScheduledTask task) {
-    storageUtil.expectTaskFetch(
-        Query.taskScoped(Tasks.id(task)).byStatus(PENDING),
-        ImmutableSet.of(task));
-  }
-
-  private void expectAssigned(IScheduledTask task) {
-    expect(assigner.maybeAssign(
-        storageUtil.mutableStoreProvider,
-        OFFER,
-        new ResourceRequest(task.getAssignedTask().getTask(), EMPTY),
-        Tasks.id(task))).andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
-  }
-
-  @Test
-  public void testReservation() throws Exception {
-    storageUtil.expectOperations();
-
-    expectTaskStillPendingQuery(TASK_A);
-    expectActiveJobFetch(TASK_A);
-    expectLaunchAttempt(false);
-    // Reserve "a" with offerA
-    expectReservationCheck(TASK_A);
-    expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
-    expectAddReservation(SLAVE_ID, TASK_A);
-
-    // Use previously created reservation.
-    expectTaskStillPendingQuery(TASK_A);
-    expectActiveJobFetch(TASK_A);
-    expectGetReservation(SLAVE_ID, TASK_A);
-    expectAssigned(TASK_A);
-    AssignmentCapture assignment = expectLaunchAttempt(true);
-
-    control.replay();
-
-    assertFalse(scheduler.schedule("a"));
-    assertTrue(scheduler.schedule("a"));
-    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
-  }
-
-  @Test
-  public void testReservationExpires() throws Exception {
-    storageUtil.expectOperations();
-
-    expectTaskStillPendingQuery(TASK_A);
-    expectActiveJobFetch(TASK_A);
-    expectLaunchAttempt(false);
-    // Reserve "a" with offerA
-    expectReservationCheck(TASK_A);
-    expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
-    expectAddReservation(SLAVE_ID, TASK_A);
-
-    // First attempt -> reservation is active.
-    expectTaskStillPendingQuery(TASK_B);
-    expectActiveJobFetch(TASK_B);
-    AssignmentCapture firstAssignment = expectLaunchAttempt(false);
-    expectGetReservation(SLAVE_ID, TASK_A);
-    expectReservationCheck(TASK_B);
-    expectPreemptorCall(TASK_B, Optional.absent());
-
-    // Status changed -> reservation removed.
-    reservations.remove(SLAVE_ID, TaskGroupKey.from(TASK_A.getAssignedTask().getTask()));
-
-    // Second attempt -> reservation expires.
-    expectGetNoReservation(SLAVE_ID);
-    expectTaskStillPendingQuery(TASK_B);
-    expectActiveJobFetch(TASK_B);
-    AssignmentCapture secondAssignment = expectLaunchAttempt(true);
-    expectAssigned(TASK_B);
-
-    control.replay();
-
-    assertFalse(scheduler.schedule("a"));
-    assertFalse(scheduler.schedule("b"));
-    assignAndAssert(Result.FAILURE, GROUP_B, OFFER, firstAssignment);
-
-    eventSink.post(TaskStateChange.transition(assign(TASK_A, SLAVE_ID), PENDING));
-    assertTrue(scheduler.schedule("b"));
-    assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment);
-  }
-
-  @Test
-  public void testReservationUnusable() throws Exception {
-    storageUtil.expectOperations();
-
-    expectTaskStillPendingQuery(TASK_A);
-    expectLaunchAttempt(false);
-    expect(reservations.getByValue(TaskGroupKey.from(TASK_A.getAssignedTask().getTask())))
-        .andReturn(ImmutableSet.of(SLAVE_ID));
-
-    control.replay();
-
-    assertFalse(scheduler.schedule("a"));
-  }
-
-  @Test
-  public void testNonPendingIgnored() throws Exception {
-    control.replay();
-
-    eventSink.post(TaskStateChange.transition(TASK_A, RUNNING));
-  }
-
-  @Test
-  public void testPendingDeletedHandled() throws Exception {
-    control.replay();
-
-    IScheduledTask task = IScheduledTask.build(TASK_A.newBuilder().setStatus(PENDING));
-    eventSink.post(TaskStateChange.transition(task, PENDING));
-  }
-
-  @Test
-  public void testIgnoresThrottledTasks() throws Exception {
-    // Ensures that tasks in THROTTLED state are not considered part of the active job state passed
-    // to the assigner function.
-
-    Storage memStorage = DbUtil.createStorage();
-
-    Injector injector = getInjector(memStorage);
-    scheduler = injector.getInstance(TaskScheduler.class);
-    eventSink = PubsubTestUtil.startPubsub(injector);
-
-    ScheduledTask builder = TASK_A.newBuilder();
-    final IScheduledTask taskA = IScheduledTask.build(builder.setStatus(PENDING));
-    builder.getAssignedTask().setTaskId("b");
-    final IScheduledTask taskB = IScheduledTask.build(builder.setStatus(THROTTLED));
-
-    memStorage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider store) {
-        store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(taskA, taskB));
-      }
-    });
-
-    expectGetNoReservation(SLAVE_ID);
-    AssignmentCapture assignment = expectLaunchAttempt(true);
-    expect(assigner.maybeAssign(
-        EasyMock.anyObject(),
-        eq(OFFER),
-        eq(new ResourceRequest(taskA.getAssignedTask().getTask(), EMPTY)),
-        eq(Tasks.id(taskA)))).andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
-
-    control.replay();
-
-    assertTrue(scheduler.schedule(Tasks.id(taskA)));
-    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
-  }
-
-  private static class AssignmentCapture {
-    public Capture<Function<HostOffer, Assignment>> assigner = createCapture();
-    public Capture<TaskGroupKey> groupKey = createCapture();
-  }
-
-  private void expectPreemptorCall(IScheduledTask task, Optional<String> result) {
-    expect(preemptor.attemptPreemptionFor(
-        task.getAssignedTask(),
-        EMPTY,
-        storageUtil.mutableStoreProvider)).andReturn(result);
-  }
-
-  private AssignmentCapture expectLaunchAttempt(boolean taskLaunched)
-      throws OfferManager.LaunchException {
-
-    AssignmentCapture capture = new AssignmentCapture();
-    expect(offerManager.launchFirst(capture(capture.assigner), capture(capture.groupKey)))
-        .andReturn(taskLaunched);
-    return capture;
-  }
-
-  private IScheduledTask assign(IScheduledTask task, String slaveId) {
-    ScheduledTask result = task.newBuilder();
-    result.getAssignedTask().setSlaveId(slaveId);
-    return IScheduledTask.build(result);
-  }
-
-  private void assignAndAssert(
-      Result result,
-      TaskGroupKey groupKey,
-      HostOffer offer,
-      AssignmentCapture capture) {
-
-    assertEquals(result, capture.assigner.getValue().apply(offer).getResult());
-    assertEquals(groupKey, capture.groupKey.getValue());
-  }
-
-  private void expectActiveJobFetch(IScheduledTask task) {
-    storageUtil.expectTaskFetch(
-        Query.jobScoped(Tasks.SCHEDULED_TO_JOB_KEY.apply(task))
-            .byStatus(Tasks.SLAVE_ASSIGNED_STATES),
-        ImmutableSet.of());
-  }
-
-  private void expectAddReservation(String slaveId, IScheduledTask task) {
-    reservations.put(slaveId, TaskGroupKey.from(task.getAssignedTask().getTask()));
-  }
-
-  private IExpectationSetters<?> expectGetReservation(String slaveId, IScheduledTask task) {
-    return expect(reservations.get(slaveId))
-        .andReturn(Optional.of(TaskGroupKey.from(task.getAssignedTask().getTask())));
-  }
-
-  private IExpectationSetters<?> expectGetNoReservation(String slaveId) {
-    return expect(reservations.get(slaveId)).andReturn(Optional.absent());
-  }
-
-  private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) {
-    return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
-        .andReturn(ImmutableSet.of());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
deleted file mode 100644
index ed15401..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ /dev/null
@@ -1,669 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.RateLimiter;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.BackoffStrategy;
-
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.OfferManager.OfferManagerImpl;
-import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay;
-import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
-import org.apache.aurora.scheduler.async.preemptor.BiCache;
-import org.apache.aurora.scheduler.async.preemptor.Preemptor;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.MaintenanceController;
-import org.apache.aurora.scheduler.state.StateChangeResult;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
-import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
-import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
-import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
-import static org.apache.aurora.gen.ScheduleStatus.INIT;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
-import static org.apache.mesos.Protos.Offer;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.isA;
-import static org.junit.Assert.assertEquals;
-
-/**
- * TODO(wfarner): Break this test up to independently test TaskSchedulerImpl and OfferQueueImpl.
- */
-public class TaskSchedulerTest extends EasyMockTest {
-
-  private static final long FIRST_SCHEDULE_DELAY_MS = 1L;
-
-  private static final HostOffer OFFER_A =  makeOffer("OFFER_A", "HOST_A", NONE);
-  private static final HostOffer OFFER_B = makeOffer("OFFER_B", "HOST_B", SCHEDULED);
-  private static final HostOffer OFFER_C = makeOffer("OFFER_C", "HOST_C", DRAINING);
-  private static final HostOffer OFFER_D = makeOffer("OFFER_D", "HOST_D", DRAINED);
-  private static final String SLAVE_A = OFFER_A.getOffer().getSlaveId().getValue();
-  private static final String SLAVE_B = OFFER_B.getOffer().getSlaveId().getValue();
-  private static final String SLAVE_C = OFFER_C.getOffer().getSlaveId().getValue();
-
-  private Storage storage;
-
-  private MaintenanceController maintenance;
-  private StateManager stateManager;
-  private TaskAssigner assigner;
-  private BackoffStrategy retryStrategy;
-  private Driver driver;
-  private ScheduledExecutorService executor;
-  private ScheduledFuture<?> future;
-  private OfferReturnDelay returnDelay;
-  private OfferManager offerManager;
-  private TaskGroups taskGroups;
-  private RescheduleCalculator rescheduleCalculator;
-  private Preemptor preemptor;
-  private BiCache<String, TaskGroupKey> reservations;
-
-  @Before
-  public void setUp() {
-    storage = DbUtil.createStorage();
-    maintenance = createMock(MaintenanceController.class);
-    stateManager = createMock(StateManager.class);
-    assigner = createMock(TaskAssigner.class);
-    retryStrategy = createMock(BackoffStrategy.class);
-    driver = createMock(Driver.class);
-    executor = createMock(ScheduledExecutorService.class);
-    future = createMock(ScheduledFuture.class);
-    returnDelay = createMock(OfferReturnDelay.class);
-    rescheduleCalculator = createMock(RescheduleCalculator.class);
-    preemptor = createMock(Preemptor.class);
-    reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { });
-  }
-
-  private void replayAndCreateScheduler() {
-    control.replay();
-    offerManager = new OfferManagerImpl(driver, returnDelay, executor);
-    TaskScheduler scheduler = new TaskSchedulerImpl(storage,
-        stateManager,
-        assigner,
-        offerManager,
-        preemptor,
-        reservations);
-    taskGroups = new TaskGroups(
-        executor,
-        Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS),
-        retryStrategy,
-        RateLimiter.create(100),
-        scheduler,
-        rescheduleCalculator);
-  }
-
-  private Capture<Runnable> expectOffer() {
-    return expectOfferDeclineIn(10);
-  }
-
-  private Capture<Runnable> expectOfferDeclineIn(long delayMillis) {
-    expect(returnDelay.get()).andReturn(Amount.of(delayMillis, Time.MILLISECONDS));
-    Capture<Runnable> runnable = createCapture();
-    executor.schedule(capture(runnable), eq(delayMillis), eq(TimeUnit.MILLISECONDS));
-    expectLastCall().andReturn(createMock(ScheduledFuture.class));
-    return runnable;
-  }
-
-  private void changeState(
-      IScheduledTask task,
-      ScheduleStatus oldState,
-      ScheduleStatus newState) {
-
-    final IScheduledTask copy = IScheduledTask.build(task.newBuilder().setStatus(newState));
-    // Insert the task if it doesn't already exist.
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        if (Iterables.isEmpty(taskStore.fetchTasks(Query.taskScoped(Tasks.id(copy))))) {
-          taskStore.saveTasks(ImmutableSet.of(copy));
-        }
-      }
-    });
-    taskGroups.taskChangedState(TaskStateChange.transition(copy, oldState));
-  }
-
-  private Capture<Runnable> expectTaskRetryIn(long penaltyMs) {
-    Capture<Runnable> capture = createCapture();
-    executor.schedule(
-        capture(capture),
-        eq(penaltyMs),
-        eq(TimeUnit.MILLISECONDS));
-    expectLastCall().andReturn(future);
-    return capture;
-  }
-
-  private Capture<Runnable> expectTaskGroupBackoff(long previousPenaltyMs, long nextPenaltyMs) {
-    expect(retryStrategy.calculateBackoffMs(previousPenaltyMs)).andReturn(nextPenaltyMs);
-    return expectTaskRetryIn(nextPenaltyMs);
-  }
-
-  @Test
-  public void testNoTasks() {
-    expectAnyMaintenanceCalls();
-    expectOfferDeclineIn(10);
-    expectOfferDeclineIn(10);
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(OFFER_B);
-  }
-
-  @Test
-  public void testNoOffers() {
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    IScheduledTask task = createTask("a");
-    expectPreemptorCall(task.getAssignedTask());
-    expectReservationCheck(task);
-
-    replayAndCreateScheduler();
-
-    changeState(task, INIT, PENDING);
-    timeoutCapture.getValue().run();
-  }
-
-  private IScheduledTask createTask(String taskId) {
-    return createTask(taskId, null);
-  }
-
-  private IScheduledTask createTask(String taskId, @Nullable ScheduleStatus status) {
-    return setStatus(makeTask(taskId, TaskTestUtil.JOB), status);
-  }
-
-  private IScheduledTask setStatus(IScheduledTask task, @Nullable ScheduleStatus status) {
-    return IScheduledTask.build(task.newBuilder().setStatus(status));
-  }
-
-  @Test
-  public void testLoadFromStorage() {
-    final IScheduledTask a = createTask("a", KILLED);
-    final IScheduledTask b = createTask("b", PENDING);
-    final IScheduledTask c = createTask("c", RUNNING);
-
-    expect(rescheduleCalculator.getStartupScheduleDelayMs(b)).andReturn(10L);
-    expectTaskRetryIn(10);
-
-    replayAndCreateScheduler();
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider store) {
-        store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(a, b, c));
-      }
-    });
-    for (IScheduledTask task : ImmutableList.of(a, b, c)) {
-      taskGroups.taskChangedState(TaskStateChange.initialized(task));
-    }
-    changeState(c, RUNNING, FINISHED);
-  }
-
-  @Test
-  public void testTaskMissing() {
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    replayAndCreateScheduler();
-
-    taskGroups.taskChangedState(TaskStateChange.transition(createTask("a", PENDING), INIT));
-    timeoutCapture.getValue().run();
-  }
-
-  private IExpectationSetters<Assignment> expectMaybeAssign(
-      HostOffer offer,
-      IScheduledTask task,
-      AttributeAggregate jobAggregate) {
-
-    return expect(assigner.maybeAssign(
-        EasyMock.anyObject(),
-        eq(offer),
-        eq(new ResourceRequest(task.getAssignedTask().getTask(), jobAggregate)),
-        eq(Tasks.id(task))));
-  }
-
-  private IExpectationSetters<?> expectNoReservation(String slaveId) {
-    return expect(reservations.get(slaveId)).andReturn(Optional.absent());
-  }
-
-  private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) {
-    return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
-        .andReturn(ImmutableSet.of());
-  }
-
-  @Test
-  public void testTaskAssigned() {
-    expectAnyMaintenanceCalls();
-    expectOfferDeclineIn(10);
-
-    IScheduledTask taskA = createTask("a", PENDING);
-    TaskInfo mesosTask = makeTaskInfo(taskA);
-
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectNoReservation(SLAVE_A).times(2);
-    expectReservationCheck(taskA);
-    expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.failure());
-    expectPreemptorCall(taskA.getAssignedTask());
-
-    Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTask));
-    driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
-
-    Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    IScheduledTask taskB = createTask("b");
-    expectReservationCheck(taskB);
-    expectPreemptorCall(taskB.getAssignedTask());
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    changeState(taskA, INIT, PENDING);
-    timeoutCapture.getValue().run();
-    timeoutCapture2.getValue().run();
-
-    // Ensure the offer was consumed.
-    changeState(taskB, INIT, PENDING);
-    timeoutCapture3.getValue().run();
-  }
-
-  @Test
-  public void testDriverNotReady() {
-    IScheduledTask task = createTask("a", PENDING);
-    TaskInfo mesosTask = TaskInfo.newBuilder()
-        .setName(Tasks.id(task))
-        .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task)))
-        .setSlaveId(SlaveID.newBuilder().setValue("slaveId"))
-        .build();
-
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectAnyMaintenanceCalls();
-    expectOfferDeclineIn(10);
-    expectNoReservation(SLAVE_A);
-    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
-    driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
-    expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
-    expect(stateManager.changeState(
-        EasyMock.anyObject(),
-        eq("a"),
-        eq(Optional.of(PENDING)),
-        eq(LOST),
-        eq(TaskSchedulerImpl.LAUNCH_FAILED_MSG)))
-        .andReturn(StateChangeResult.SUCCESS);
-
-    replayAndCreateScheduler();
-
-    changeState(task, INIT, PENDING);
-    offerManager.addOffer(OFFER_A);
-    timeoutCapture.getValue().run();
-  }
-
-  @Test
-  public void testStorageException() {
-    IScheduledTask task = createTask("a", PENDING);
-    TaskInfo mesosTask = TaskInfo.newBuilder()
-        .setName(Tasks.id(task))
-        .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task)))
-        .setSlaveId(SlaveID.newBuilder().setValue("slaveId"))
-        .build();
-
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectAnyMaintenanceCalls();
-    expectOfferDeclineIn(10);
-    expectNoReservation(SLAVE_A).times(2);
-    expectMaybeAssign(OFFER_A, task, EMPTY).andThrow(new StorageException("Injected failure."));
-
-    Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
-    driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
-    expectLastCall();
-
-    replayAndCreateScheduler();
-
-    changeState(task, INIT, PENDING);
-    offerManager.addOffer(OFFER_A);
-    timeoutCapture.getValue().run();
-    timeoutCapture2.getValue().run();
-  }
-
-  @Test
-  public void testExpiration() {
-    IScheduledTask task = createTask("a", PENDING);
-
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
-    expectAnyMaintenanceCalls();
-    expectNoReservation(SLAVE_A);
-    expectReservationCheck(task).times(2);
-    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
-    Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    expectPreemptorCall(task.getAssignedTask());
-    driver.declineOffer(OFFER_A.getOffer().getId());
-    expectTaskGroupBackoff(10, 20);
-    expectPreemptorCall(task.getAssignedTask());
-
-    replayAndCreateScheduler();
-
-    changeState(task, INIT, PENDING);
-    offerManager.addOffer(OFFER_A);
-    timeoutCapture.getValue().run();
-    offerExpirationCapture.getValue().run();
-    timeoutCapture2.getValue().run();
-  }
-
-  @Test
-  public void testOneOfferPerSlave() {
-    expectAnyMaintenanceCalls();
-    Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
-
-    HostOffer offerAB = new HostOffer(
-        Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getOffer().getSlaveId()).build(),
-        IHostAttributes.build(new HostAttributes()));
-
-    driver.declineOffer(OFFER_A.getOffer().getId());
-    driver.declineOffer(offerAB.getOffer().getId());
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(offerAB);
-    offerExpirationCapture.getValue().run();
-  }
-
-  @Test
-  public void testDontDeclineAcceptedOffer() throws OfferManager.LaunchException {
-    expectAnyMaintenanceCalls();
-    Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
-
-    Function<HostOffer, Assignment> offerAcceptor =
-        createMock(new Clazz<Function<HostOffer, Assignment>>() { });
-    final TaskInfo taskInfo = TaskInfo.getDefaultInstance();
-    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(taskInfo));
-    driver.launchTask(OFFER_A.getOffer().getId(), taskInfo);
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    offerManager.launchFirst(offerAcceptor, TaskGroupKey.from(ITaskConfig.build(new TaskConfig())));
-    offerExpirationCapture.getValue().run();
-  }
-
-  @Test
-  public void testBasicMaintenancePreferences() {
-    expectOffer();
-    expectOffer();
-    expectOffer();
-    expectOffer();
-
-    IScheduledTask taskA = createTask("A", PENDING);
-    TaskInfo mesosTaskA = makeTaskInfo(taskA);
-    expectNoReservation(SLAVE_A);
-    expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA));
-    driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA);
-    Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    IScheduledTask taskB = createTask("B", PENDING);
-    TaskInfo mesosTaskB = makeTaskInfo(taskB);
-    expectNoReservation(SLAVE_B);
-    expectMaybeAssign(OFFER_B, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB));
-    driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB);
-    Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_D);
-    offerManager.addOffer(OFFER_C);
-    offerManager.addOffer(OFFER_B);
-    offerManager.addOffer(OFFER_A);
-
-    changeState(taskA, INIT, PENDING);
-    captureA.getValue().run();
-
-    changeState(taskB, INIT, PENDING);
-    captureB.getValue().run();
-  }
-
-  @Test
-  public void testChangingMaintenancePreferences() {
-    expectOffer();
-    expectOffer();
-    expectOffer();
-
-    IScheduledTask taskA = createTask("A", PENDING);
-    TaskInfo mesosTaskA = makeTaskInfo(taskA);
-    expectNoReservation(SLAVE_B);
-    expectMaybeAssign(OFFER_B, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA));
-    driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA);
-    Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    IScheduledTask taskB = createTask("B", PENDING);
-    TaskInfo mesosTaskB = makeTaskInfo(taskB);
-    HostOffer updatedOfferC = new HostOffer(
-        OFFER_C.getOffer(),
-        IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE)));
-    expectNoReservation(SLAVE_C);
-    expectMaybeAssign(updatedOfferC, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB));
-    driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB);
-    Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(OFFER_B);
-    offerManager.addOffer(OFFER_C);
-
-    // Initially, we'd expect the offers to be consumed in order (A, B), with (C) unschedulable
-
-    // Expected order now (B), with (C, A) unschedulable
-    changeHostMaintenanceState(OFFER_A.getAttributes(), DRAINING);
-    changeState(taskA, INIT, PENDING);
-    captureA.getValue().run();
-
-    // Expected order now (C), with (A) unschedulable and (B) already consumed
-    changeHostMaintenanceState(OFFER_C.getAttributes(), NONE);
-    changeState(taskB, INIT, PENDING);
-    captureB.getValue().run();
-  }
-
-  private Capture<String> expectTaskScheduled(IScheduledTask task) {
-    TaskInfo mesosTask = makeTaskInfo(task);
-    Capture<String> taskId = createCapture();
-    expect(assigner.maybeAssign(
-        EasyMock.anyObject(),
-        EasyMock.anyObject(),
-        EasyMock.anyObject(),
-        capture(taskId))).andReturn(Assignment.success(mesosTask));
-    driver.launchTask(EasyMock.anyObject(), eq(mesosTask));
-    return taskId;
-  }
-
-  @Test
-  public void testResistsStarvation() {
-    // TODO(wfarner): This test requires intimate knowledge of the way futures are used inside
-    // TaskScheduler.  It's time to test using a real ScheduledExecutorService.
-
-    expectAnyMaintenanceCalls();
-
-    IScheduledTask jobA0 = setStatus(makeTask("a0", JobKeys.from("a", "b", "c")), PENDING);
-
-    ScheduledTask jobA1Builder = jobA0.newBuilder();
-    jobA1Builder.getAssignedTask().setTaskId("a1");
-    jobA1Builder.getAssignedTask().setInstanceId(1);
-    IScheduledTask jobA1 = IScheduledTask.build(jobA1Builder);
-
-    ScheduledTask jobA2Builder = jobA0.newBuilder();
-    jobA2Builder.getAssignedTask().setTaskId("a2");
-    jobA2Builder.getAssignedTask().setInstanceId(2);
-    IScheduledTask jobA2 = IScheduledTask.build(jobA2Builder);
-
-    IScheduledTask jobB0 = setStatus(makeTask("b0", JobKeys.from("d", "e", "f")), PENDING);
-
-    expectNoReservation(SLAVE_A);
-    expectNoReservation(SLAVE_B);
-
-    expectOfferDeclineIn(10);
-    expectOfferDeclineIn(10);
-    expectOfferDeclineIn(10);
-    expectOfferDeclineIn(10);
-
-    Capture<Runnable> timeoutA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    Capture<Runnable> timeoutB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    Capture<String> firstScheduled = expectTaskScheduled(jobA0);
-    Capture<String> secondScheduled = expectTaskScheduled(jobB0);
-
-    // Expect another watch of the task group for job A.
-    expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(OFFER_B);
-    offerManager.addOffer(OFFER_C);
-    offerManager.addOffer(OFFER_D);
-    changeState(jobA0, INIT, PENDING);
-    changeState(jobA1, INIT, PENDING);
-    changeState(jobA2, INIT, PENDING);
-    changeState(jobB0, INIT, PENDING);
-    timeoutA.getValue().run();
-    timeoutB.getValue().run();
-    assertEquals(
-        ImmutableSet.of(Tasks.id(jobA0), Tasks.id(jobB0)),
-        ImmutableSet.of(firstScheduled.getValue(), secondScheduled.getValue()));
-  }
-
-  @Test
-  public void testTaskDeleted() {
-    expectAnyMaintenanceCalls();
-    expectOfferDeclineIn(10);
-
-    final IScheduledTask task = createTask("a", PENDING);
-
-    Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectNoReservation(SLAVE_A);
-    expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
-    expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20);
-    expectReservationCheck(task);
-    expectPreemptorCall(task.getAssignedTask());
-
-    replayAndCreateScheduler();
-
-    offerManager.addOffer(OFFER_A);
-    changeState(task, INIT, PENDING);
-    timeoutCapture.getValue().run();
-
-    // Ensure the offer was consumed.
-    changeState(task, INIT, PENDING);
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().deleteTasks(Tasks.ids(task));
-      }
-    });
-    taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task)));
-    timeoutCapture.getValue().run();
-  }
-
-  private TaskInfo makeTaskInfo(IScheduledTask task) {
-    return TaskInfo.newBuilder()
-        .setName(Tasks.id(task))
-        .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task)))
-        .setSlaveId(SlaveID.newBuilder().setValue("slave-id" + task.toString()))
-        .build();
-  }
-
-  private void expectAnyMaintenanceCalls() {
-    expect(maintenance.getMode(isA(String.class))).andReturn(NONE).anyTimes();
-  }
-
-  private void changeHostMaintenanceState(IHostAttributes attributes, MaintenanceMode mode) {
-    offerManager.hostAttributesChanged(new PubsubEvent.HostAttributesChanged(
-        IHostAttributes.build(attributes.newBuilder().setMode(mode))));
-  }
-
-  private static HostOffer makeOffer(String offerId, String hostName, MaintenanceMode mode) {
-    Offer offer = Offers.makeOffer(offerId, hostName);
-    return new HostOffer(
-        offer,
-        IHostAttributes.build(new HostAttributes()
-            .setHost(hostName)
-            .setSlaveId(offer.getSlaveId().getValue())
-            .setAttributes(ImmutableSet.of())
-            .setMode(mode)));
-  }
-
-  private void expectPreemptorCall(IAssignedTask task) {
-    expect(preemptor.attemptPreemptionFor(
-        eq(task),
-        eq(EMPTY),
-        EasyMock.anyObject())).andReturn(Optional.absent());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
deleted file mode 100644
index cfa9d81..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskThrottlerTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.state.StateChangeResult;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.easymock.Capture;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.INIT;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-
-public class TaskThrottlerTest extends EasyMockTest {
-
-  private RescheduleCalculator rescheduleCalculator;
-  private FakeClock clock;
-  private ScheduledExecutorService executor;
-  private StorageTestUtil storageUtil;
-  private StateManager stateManager;
-  private TaskThrottler throttler;
-
-  @Before
-  public void setUp() throws Exception {
-    rescheduleCalculator = createMock(RescheduleCalculator.class);
-    clock = new FakeClock();
-    executor = createMock(ScheduledExecutorService.class);
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-    stateManager = createMock(StateManager.class);
-    throttler = new TaskThrottler(
-        rescheduleCalculator,
-        clock,
-        executor,
-        storageUtil.storage,
-        stateManager);
-  }
-
-  @Test
-  public void testIgnoresNonThrottledTasks() {
-    control.replay();
-
-    throttler.taskChangedState(TaskStateChange.transition(makeTask("a", PENDING), INIT));
-    throttler.taskChangedState(TaskStateChange.transition(makeTask("a", RUNNING), PENDING));
-  }
-
-  @Test
-  public void testThrottledTask() {
-    IScheduledTask task = makeTask("a", THROTTLED);
-
-    long penaltyMs = 100;
-
-    expect(rescheduleCalculator.getFlappingPenaltyMs(task)).andReturn(penaltyMs);
-    Capture<Runnable> stateChangeCapture = expectThrottled(penaltyMs);
-    expectMovedToPending(task);
-
-    control.replay();
-
-    throttler.taskChangedState(TaskStateChange.transition(task, INIT));
-    stateChangeCapture.getValue().run();
-  }
-
-  @Test
-  public void testThrottledTaskReady() {
-    // Ensures that a sane delay is used when the task's penalty was already expired when
-    // the -> THROTTLED transition occurred (such as in the event of a scheduler failover).
-
-    IScheduledTask task = makeTask("a", THROTTLED);
-
-    long penaltyMs = 100;
-
-    expect(rescheduleCalculator.getFlappingPenaltyMs(task)).andReturn(penaltyMs);
-    Capture<Runnable> stateChangeCapture = expectThrottled(0);
-    expectMovedToPending(task);
-
-    control.replay();
-
-    clock.advance(Amount.of(1L, Time.HOURS));
-    throttler.taskChangedState(TaskStateChange.transition(task, INIT));
-    stateChangeCapture.getValue().run();
-  }
-
-  private Capture<Runnable> expectThrottled(long penaltyMs) {
-    Capture<Runnable> stateChangeCapture = createCapture();
-    expect(executor.schedule(
-        capture(stateChangeCapture),
-        eq(penaltyMs),
-        eq(TimeUnit.MILLISECONDS)))
-        .andReturn(null);
-    return stateChangeCapture;
-  }
-
-  private void expectMovedToPending(IScheduledTask task) {
-    expect(stateManager.changeState(
-        storageUtil.mutableStoreProvider,
-        Tasks.id(task),
-        Optional.of(THROTTLED),
-        PENDING,
-        Optional.absent()))
-        .andReturn(StateChangeResult.SUCCESS);
-  }
-
-  private IScheduledTask makeTask(String id, ScheduleStatus status) {
-    return IScheduledTask.build(new ScheduledTask()
-        .setTaskEvents(ImmutableList.of(
-            new TaskEvent()
-                .setStatus(status)
-                .setTimestamp(clock.nowMillis())))
-        .setStatus(status)
-        .setAssignedTask(new AssignedTask().setTaskId(id)));
-  }
-}


Mime
View raw message