aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [3/8] aurora git commit: Break apart async package and AsyncModule into purpose-specific equivalents.
Date Wed, 22 Jul 2015 19:40:00 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
deleted file mode 100644
index b98a8d7..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.state.StateChangeResult;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
-import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
-import static org.apache.aurora.gen.ScheduleStatus.INIT;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLING;
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.STARTING;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-
-public class TaskTimeoutTest extends EasyMockTest {
-
-  private static final String TASK_ID = "task_id";
-  private static final Amount<Long, Time> TIMEOUT = Amount.of(1L, Time.MINUTES);
-
-  private AtomicLong timedOutTaskCounter;
-  private ScheduledExecutorService executor;
-  private StorageTestUtil storageUtil;
-  private ScheduledFuture<?> future;
-  private StateManager stateManager;
-  private FakeClock clock;
-  private TaskTimeout timeout;
-  private StatsProvider statsProvider;
-
-  @Before
-  public void setUp() {
-    executor = createMock(ScheduledExecutorService.class);
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-    future = createMock(new Clazz<ScheduledFuture<?>>() { });
-    stateManager = createMock(StateManager.class);
-    clock = new FakeClock();
-    statsProvider = createMock(StatsProvider.class);
-    timedOutTaskCounter = new AtomicLong();
-    expect(statsProvider.makeCounter(TaskTimeout.TIMED_OUT_TASKS_COUNTER))
-        .andReturn(timedOutTaskCounter);
-  }
-
-  private void replayAndCreate() {
-    control.replay();
-    timeout = new TaskTimeout(
-        executor,
-        storageUtil.storage,
-        stateManager,
-        TIMEOUT,
-        statsProvider);
-    timeout.startAsync().awaitRunning();
-  }
-
-  private Capture<Runnable> expectTaskWatch(Amount<Long, Time> expireIn) {
-    Capture<Runnable> capture = createCapture();
-    executor.schedule(
-        EasyMock.capture(capture),
-        eq((long) expireIn.getValue()),
-        eq(expireIn.getUnit().getTimeUnit()));
-    expectLastCall().andReturn(future);
-    return capture;
-  }
-
-  private Capture<Runnable> expectTaskWatch() {
-    return expectTaskWatch(TIMEOUT);
-  }
-
-  private void changeState(String taskId, ScheduleStatus from, ScheduleStatus to) {
-    IScheduledTask task = IScheduledTask.build(new ScheduledTask()
-        .setStatus(to)
-        .setAssignedTask(new AssignedTask().setTaskId(taskId)));
-    timeout.recordStateChange(TaskStateChange.transition(task, from));
-  }
-
-  private void changeState(ScheduleStatus from, ScheduleStatus to) {
-    changeState(TASK_ID, from, to);
-  }
-
-  @Test
-  public void testNormalTransitions() {
-    expectTaskWatch();
-    expectTaskWatch();
-
-    replayAndCreate();
-
-    changeState(INIT, PENDING);
-    changeState(PENDING, ASSIGNED);
-    changeState(ASSIGNED, STARTING);
-    changeState(STARTING, RUNNING);
-    changeState(RUNNING, KILLING);
-    changeState(KILLING, KILLED);
-  }
-
-  @Test
-  public void testTransientToTransient() {
-    expectTaskWatch();
-    Capture<Runnable> killingTimeout = expectTaskWatch();
-    expect(stateManager.changeState(
-        storageUtil.mutableStoreProvider,
-        TASK_ID,
-        Optional.of(KILLING),
-        LOST,
-        TaskTimeout.TIMEOUT_MESSAGE))
-        .andReturn(StateChangeResult.SUCCESS);
-
-    replayAndCreate();
-
-    changeState(PENDING, ASSIGNED);
-    changeState(ASSIGNED, KILLING);
-    killingTimeout.getValue().run();
-  }
-
-  @Test
-  public void testTimeout() throws Exception {
-    Capture<Runnable> assignedTimeout = expectTaskWatch();
-    expect(stateManager.changeState(
-        storageUtil.mutableStoreProvider,
-        TASK_ID,
-        Optional.of(ASSIGNED),
-        LOST,
-        TaskTimeout.TIMEOUT_MESSAGE))
-        .andReturn(StateChangeResult.SUCCESS);
-
-    replayAndCreate();
-
-    changeState(INIT, PENDING);
-    changeState(PENDING, ASSIGNED);
-    assignedTimeout.getValue().run();
-    assertEquals(timedOutTaskCounter.intValue(), 1);
-  }
-
-  @Test
-  public void testTaskDeleted() throws Exception {
-    Capture<Runnable> assignedTimeout = expectTaskWatch();
-    expect(stateManager.changeState(
-        storageUtil.mutableStoreProvider,
-        TASK_ID,
-        Optional.of(KILLING),
-        LOST,
-        TaskTimeout.TIMEOUT_MESSAGE))
-        .andReturn(StateChangeResult.ILLEGAL);
-
-    replayAndCreate();
-
-    changeState(INIT, PENDING);
-    changeState(PENDING, KILLING);
-    assignedTimeout.getValue().run();
-    assertEquals(timedOutTaskCounter.intValue(), 0);
-  }
-
-  private static IScheduledTask makeTask(
-      String taskId,
-      ScheduleStatus status,
-      long stateEnteredMs) {
-
-    return IScheduledTask.build(new ScheduledTask()
-        .setStatus(status)
-        .setTaskEvents(ImmutableList.of(new TaskEvent(stateEnteredMs, status)))
-        .setAssignedTask(new AssignedTask()
-            .setTaskId(taskId)
-            .setTask(new TaskConfig())));
-  }
-
-  @Test
-  public void testStorageStart() {
-    expectTaskWatch(TIMEOUT);
-    expectTaskWatch(TIMEOUT);
-    expectTaskWatch(TIMEOUT);
-
-    replayAndCreate();
-
-    clock.setNowMillis(TIMEOUT.as(Time.MILLISECONDS) * 2);
-    for (IScheduledTask task : ImmutableList.of(
-        makeTask("a", ASSIGNED, 0),
-        makeTask("b", KILLING, TIMEOUT.as(Time.MILLISECONDS)),
-        makeTask("c", PREEMPTING, clock.nowMillis() + TIMEOUT.as(Time.MILLISECONDS)))) {
-
-      timeout.recordStateChange(TaskStateChange.initialized(task));
-    }
-
-    changeState("a", ASSIGNED, RUNNING);
-    changeState("b", KILLING, KILLED);
-    changeState("c", PREEMPTING, FINISHED);
-  }
-
-  @Test
-  public void testTimeoutWhileNotStarted() throws Exception {
-    // Since the timeout is never instructed to start, it should not attempt to transition tasks,
-    // but it should try again later.
-    Capture<Runnable> assignedTimeout = expectTaskWatch();
-    expectTaskWatch(TaskTimeout.NOT_STARTED_RETRY);
-
-    control.replay();
-    timeout = new TaskTimeout(executor, storageUtil.storage, stateManager, TIMEOUT, statsProvider);
-
-    changeState(INIT, PENDING);
-    changeState(PENDING, ASSIGNED);
-    assignedTimeout.getValue().run();
-    assertEquals(timedOutTaskCounter.intValue(), 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java
deleted file mode 100644
index babc17f..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class BiCacheTest {
-  private static final Amount<Long, Time> HOLD_DURATION = Amount.of(1L, Time.MINUTES);
-  private static final String STAT_NAME = "cache_size_stat";
-  private static final String KEY_1 = "Key 1";
-  private static final String KEY_2 = "Key 2";
-  private static final Optional<Integer> NO_VALUE = Optional.absent();
-
-  private FakeStatsProvider statsProvider;
-  private FakeClock clock;
-  private BiCache<String, Integer> biCache;
-
-  @Before
-  public void setUp() {
-    statsProvider = new FakeStatsProvider();
-    clock = new FakeClock();
-    biCache = new BiCache<>(statsProvider, new BiCacheSettings(HOLD_DURATION, STAT_NAME), clock);
-  }
-
-  @Test
-  public void testExpiration() {
-    biCache.put(KEY_1, 1);
-    assertEquals(Optional.of(1), biCache.get(KEY_1));
-    assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
-
-    clock.advance(HOLD_DURATION);
-
-    assertEquals(NO_VALUE, biCache.get(KEY_1));
-    assertEquals(ImmutableSet.of(), biCache.getByValue(1));
-    assertEquals(0L, statsProvider.getLongValue(STAT_NAME));
-  }
-
-  @Test
-  public void testRemoval() {
-    biCache.put(KEY_1, 1);
-    assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
-    assertEquals(Optional.of(1), biCache.get(KEY_1));
-    biCache.remove(KEY_1, 1);
-    assertEquals(NO_VALUE, biCache.get(KEY_1));
-    assertEquals(0L, statsProvider.getLongValue(STAT_NAME));
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testRemovalWithNullKey() {
-    biCache.remove(null, 1);
-  }
-
-  @Test
-  public void testDifferentKeysIdenticalValues() {
-    biCache.put(KEY_1, 1);
-    biCache.put(KEY_2, 1);
-    assertEquals(2L, statsProvider.getLongValue(STAT_NAME));
-
-    assertEquals(Optional.of(1), biCache.get(KEY_1));
-    assertEquals(Optional.of(1), biCache.get(KEY_2));
-    assertEquals(ImmutableSet.of(KEY_1, KEY_2), biCache.getByValue(1));
-
-    biCache.remove(KEY_1, 1);
-    assertEquals(NO_VALUE, biCache.get(KEY_1));
-    assertEquals(Optional.of(1), biCache.get(KEY_2));
-    assertEquals(ImmutableSet.of(KEY_2), biCache.getByValue(1));
-    assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
-
-    clock.advance(HOLD_DURATION);
-    assertEquals(NO_VALUE, biCache.get(KEY_1));
-    assertEquals(NO_VALUE, biCache.get(KEY_2));
-    assertEquals(ImmutableSet.of(), biCache.getByValue(1));
-    assertEquals(0L, statsProvider.getLongValue(STAT_NAME));
-  }
-
-  @Test
-  public void testIdenticalKeysDifferentValues() {
-    biCache.put(KEY_1, 1);
-    biCache.put(KEY_1, 2);
-    assertEquals(Optional.of(2), biCache.get(KEY_1));
-    assertEquals(ImmutableSet.of(), biCache.getByValue(1));
-    assertEquals(ImmutableSet.of(KEY_1), biCache.getByValue(2));
-    assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImplTest.java
deleted file mode 100644
index 1572a08..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImplTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableMultimap;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
-import static org.apache.aurora.gen.ScheduleStatus.FAILED;
-import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLING;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.junit.Assert.assertEquals;
-
-public class ClusterStateImplTest {
-
-  private ClusterStateImpl state;
-
-  @Before
-  public void setUp() {
-    state = new ClusterStateImpl();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testImmutable() {
-    state.getSlavesToActiveTasks().clear();
-  }
-
-  @Test
-  public void testTaskLifecycle() {
-    IAssignedTask a = makeTask("a", "s1");
-
-    assertVictims();
-    changeState(a, THROTTLED);
-    assertVictims();
-    changeState(a, PENDING);
-    assertVictims();
-    changeState(a, ASSIGNED);
-    assertVictims(a);
-    changeState(a, RUNNING);
-    assertVictims(a);
-    changeState(a, KILLING);
-    assertVictims(a);
-    changeState(a, FINISHED);
-    assertVictims();
-  }
-
-  @Test
-  public void testTaskChangesSlaves() {
-    // We do not intend to handle the case of an external failure leading to the same task ID
-    // on a different slave.
-    IAssignedTask a = makeTask("a", "s1");
-    IAssignedTask a1 = makeTask("a", "s2");
-    changeState(a, RUNNING);
-    changeState(a1, RUNNING);
-    assertVictims(a, a1);
-  }
-
-  @Test
-  public void testMultipleTasks() {
-    IAssignedTask a = makeTask("a", "s1");
-    IAssignedTask b = makeTask("b", "s1");
-    IAssignedTask c = makeTask("c", "s2");
-    IAssignedTask d = makeTask("d", "s3");
-    IAssignedTask e = makeTask("e", "s3");
-    IAssignedTask f = makeTask("f", "s1");
-    changeState(a, RUNNING);
-    assertVictims(a);
-    changeState(b, RUNNING);
-    assertVictims(a, b);
-    changeState(c, RUNNING);
-    assertVictims(a, b, c);
-    changeState(d, RUNNING);
-    assertVictims(a, b, c, d);
-    changeState(e, RUNNING);
-    assertVictims(a, b, c, d, e);
-    changeState(c, FINISHED);
-    assertVictims(a, b, d, e);
-    changeState(a, FAILED);
-    changeState(e, KILLED);
-    assertVictims(b, d);
-    changeState(f, RUNNING);
-    assertVictims(b, d, f);
-  }
-
-  private void assertVictims(IAssignedTask... tasks) {
-    ImmutableMultimap.Builder<String, PreemptionVictim> victims = ImmutableMultimap.builder();
-    for (IAssignedTask task : tasks) {
-      victims.put(task.getSlaveId(), PreemptionVictim.fromTask(task));
-    }
-    assertEquals(HashMultimap.create(victims.build()), state.getSlavesToActiveTasks());
-  }
-
-  private IAssignedTask makeTask(String taskId, String slaveId) {
-    return IAssignedTask.build(new AssignedTask()
-        .setTaskId(taskId)
-        .setSlaveId(slaveId)
-        .setSlaveHost(slaveId + "host")
-        .setTask(new TaskConfig().setJob(new JobKey("role", "env", "job"))));
-  }
-
-  private void changeState(IAssignedTask assignedTask, ScheduleStatus status) {
-    IScheduledTask task = IScheduledTask.build(new ScheduledTask()
-        .setStatus(status)
-        .setAssignedTask(assignedTask.newBuilder()));
-    state.taskChangedState(TaskStateChange.transition(task, ScheduleStatus.INIT));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
deleted file mode 100644
index a0dbb25..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.util.Arrays;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.OfferManager;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.mesos.Protos;
-import org.easymock.EasyMock;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.TASK_PROCESSOR_RUN_NAME;
-import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.attemptsStatName;
-import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotSearchStatName;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-
-public class PendingTaskProcessorTest extends EasyMockTest {
-  private static final String CACHE_STAT = "cache_size";
-  private static final String SLAVE_ID_1 = "slave_id_1";
-  private static final String SLAVE_ID_2 = "slave_id_2";
-  private static final JobKey JOB_A = new JobKey("role_a", "env", "job_a");
-  private static final JobKey JOB_B = new JobKey("role_b", "env", "job_b");
-  private static final IScheduledTask TASK_A = makeTask(JOB_A, SLAVE_ID_1, "id1");
-  private static final IScheduledTask TASK_B = makeTask(JOB_B, SLAVE_ID_2, "id2");
-  private static final PreemptionProposal SLOT_A = createPreemptionProposal(TASK_A, SLAVE_ID_1);
-  private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS);
-  private static final Amount<Long, Time> EXPIRATION = Amount.of(10L, Time.MINUTES);
-
-  private StorageTestUtil storageUtil;
-  private OfferManager offerManager;
-  private FakeStatsProvider statsProvider;
-  private PreemptionVictimFilter preemptionVictimFilter;
-  private PendingTaskProcessor slotFinder;
-  private BiCache<PreemptionProposal, TaskGroupKey> slotCache;
-  private ClusterState clusterState;
-  private FakeClock clock;
-
-  @Before
-  public void setUp() {
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-    offerManager = createMock(OfferManager.class);
-    preemptionVictimFilter = createMock(PreemptionVictimFilter.class);
-    statsProvider = new FakeStatsProvider();
-    clusterState = createMock(ClusterState.class);
-    clock = new FakeClock();
-    slotCache = new BiCache<>(
-        statsProvider,
-        new BiCache.BiCacheSettings(EXPIRATION, CACHE_STAT),
-        clock);
-
-    slotFinder = new PendingTaskProcessor(
-        storageUtil.storage,
-        offerManager,
-        preemptionVictimFilter,
-        new PreemptorMetrics(new CachedCounters(statsProvider)),
-        PREEMPTION_DELAY,
-        slotCache,
-        clusterState,
-        clock);
-  }
-
-  @Test
-  public void testSearchSlotSuccessful() throws Exception {
-    expectGetPendingTasks(TASK_A, TASK_B);
-    expectGetClusterState(TASK_A, TASK_B);
-    HostOffer offer1 = makeOffer(SLAVE_ID_1);
-    HostOffer offer2 = makeOffer(SLAVE_ID_2);
-    expectOffers(offer1, offer2);
-    expectSlotSearch(TASK_A.getAssignedTask().getTask(), TASK_A);
-    expectSlotSearch(TASK_B.getAssignedTask().getTask(), TASK_B);
-
-    control.replay();
-
-    clock.advance(PREEMPTION_DELAY);
-
-    slotFinder.run();
-    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
-    assertEquals(2L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true)));
-    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true)));
-    assertEquals(2L, statsProvider.getLongValue(CACHE_STAT));
-  }
-
-  @Test
-  public void testSearchSlotFailed() throws Exception {
-    expectGetPendingTasks(TASK_A);
-    expectGetClusterState(TASK_A);
-    HostOffer offer1 = makeOffer(SLAVE_ID_1);
-    expectOffers(offer1);
-    expectSlotSearch(TASK_A.getAssignedTask().getTask());
-
-    control.replay();
-
-    clock.advance(PREEMPTION_DELAY);
-
-    slotFinder.run();
-    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true)));
-    assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, true)));
-  }
-
-  @Test
-  public void testHasCachedSlots() throws Exception {
-    slotCache.put(SLOT_A, group(TASK_A));
-    expectGetPendingTasks(TASK_A);
-    expectGetClusterState(TASK_A);
-    HostOffer offer1 = makeOffer(SLAVE_ID_1);
-    expectOffers(offer1);
-
-    control.replay();
-
-    clock.advance(PREEMPTION_DELAY);
-
-    slotFinder.run();
-    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true)));
-    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true)));
-  }
-
-  @Test
-  public void testMultipleTaskGroups() throws Exception {
-    IScheduledTask task1 = makeTask(JOB_A, "1");
-    IScheduledTask task2 = makeTask(JOB_A, "2");
-    IScheduledTask task3 = makeTask(JOB_A, "3");
-    IScheduledTask task4 = makeTask(JOB_B, "4");
-    IScheduledTask task5 = makeTask(JOB_B, "5");
-
-    expectGetPendingTasks(task1, task4, task2, task5, task3);
-    expectGetClusterState(TASK_A, TASK_B);
-
-    HostOffer offer1 = makeOffer(SLAVE_ID_1);
-    HostOffer offer2 = makeOffer(SLAVE_ID_2);
-    expectOffers(offer1, offer2);
-    expectSlotSearch(task1.getAssignedTask().getTask());
-    expectSlotSearch(task4.getAssignedTask().getTask(), TASK_B);
-    PreemptionProposal proposal1 = createPreemptionProposal(TASK_B, SLAVE_ID_1);
-    PreemptionProposal proposal2 = createPreemptionProposal(TASK_B, SLAVE_ID_2);
-
-    control.replay();
-
-    clock.advance(PREEMPTION_DELAY);
-
-    slotFinder.run();
-    assertEquals(slotCache.get(proposal1), Optional.of(group(task4)));
-    assertEquals(slotCache.get(proposal2), Optional.of(group(task5)));
-    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
-    assertEquals(3L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true)));
-    assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(false, true)));
-    assertEquals(2L, statsProvider.getLongValue(CACHE_STAT));
-  }
-
-  @Test
-  public void testNoVictims() throws Exception {
-    expectGetClusterState();
-    control.replay();
-
-    clock.advance(PREEMPTION_DELAY);
-
-    slotFinder.run();
-    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
-    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true)));
-    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true)));
-  }
-
-  private Multimap<String, PreemptionVictim> getVictims(IScheduledTask... tasks) {
-    return Multimaps.transformValues(
-        Multimaps.index(Arrays.asList(tasks), task -> task.getAssignedTask().getSlaveId()),
-        task -> PreemptionVictim.fromTask(task.getAssignedTask())
-    );
-  }
-
-  private HostOffer makeOffer(String slaveId) {
-    Protos.Offer.Builder builder = Protos.Offer.newBuilder();
-    builder.getIdBuilder().setValue("id");
-    builder.getFrameworkIdBuilder().setValue("framework-id");
-    builder.getSlaveIdBuilder().setValue(slaveId);
-    builder.setHostname(slaveId);
-    return new HostOffer(
-        builder.build(),
-        IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
-  }
-
-  private void expectOffers(HostOffer... offers) {
-    expect(offerManager.getOffers()).andReturn(ImmutableSet.copyOf(offers));
-  }
-
-  private void expectGetClusterState(IScheduledTask... returnedTasks) {
-    expect(clusterState.getSlavesToActiveTasks()).andReturn(getVictims(returnedTasks));
-  }
-
-  private void expectSlotSearch(ITaskConfig config, IScheduledTask... victims) {
-    expect(preemptionVictimFilter.filterPreemptionVictims(
-        eq(config),
-        EasyMock.anyObject(),
-        anyObject(AttributeAggregate.class),
-        EasyMock.anyObject(),
-        eq(storageUtil.storeProvider)));
-    expectLastCall().andReturn(
-        victims.length == 0
-            ? Optional.absent()
-            : Optional.of(ImmutableSet.copyOf(getVictims(victims).values())))
-        .anyTimes();
-  }
-
-  private static PreemptionProposal createPreemptionProposal(IScheduledTask task, String slaveId) {
-    return new PreemptionProposal(
-        ImmutableSet.of(PreemptionVictim.fromTask(task.getAssignedTask())),
-        slaveId);
-  }
-
-  private static IScheduledTask makeTask(JobKey key, String taskId) {
-    return makeTask(key, null, taskId);
-  }
-
-  private static TaskGroupKey group(IScheduledTask task) {
-    return TaskGroupKey.from(task.getAssignedTask().getTask());
-  }
-
-  private static IScheduledTask makeTask(JobKey key, @Nullable String slaveId, String taskId) {
-    ScheduledTask task = new ScheduledTask()
-        .setAssignedTask(new AssignedTask()
-            .setSlaveId(slaveId)
-            .setTaskId(taskId)
-            .setTask(new TaskConfig()
-                .setPriority(1)
-                .setProduction(true)
-                .setJob(key)));
-    task.addToTaskEvents(new TaskEvent(0, PENDING));
-    return IScheduledTask.build(task);
-  }
-
-  private void expectGetPendingTasks(IScheduledTask... returnedTasks) {
-    storageUtil.expectTaskFetch(Query.statusScoped(PENDING), ImmutableSet.copyOf(returnedTasks));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java
deleted file mode 100644
index 5fe8e2e..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java
+++ /dev/null
@@ -1,512 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.Constraint;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
-import org.apache.aurora.scheduler.mesos.TaskExecutors;
-import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.easymock.IExpectationSetters;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
-import static org.apache.mesos.Protos.Offer;
-import static org.apache.mesos.Protos.Resource;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class PreemptionVictimFilterTest extends EasyMockTest {
-  private static final String USER_A = "user_a";
-  private static final String USER_B = "user_b";
-  private static final String USER_C = "user_c";
-  private static final String JOB_A = "job_a";
-  private static final String JOB_B = "job_b";
-  private static final String JOB_C = "job_c";
-  private static final String TASK_ID_A = "task_a";
-  private static final String TASK_ID_B = "task_b";
-  private static final String TASK_ID_C = "task_c";
-  private static final String TASK_ID_D = "task_d";
-  private static final String HOST = "host";
-  private static final String RACK = "rack";
-  private static final String SLAVE_ID = HOST + "_id";
-  private static final String RACK_ATTRIBUTE = "rack";
-  private static final String HOST_ATTRIBUTE = "host";
-  private static final String OFFER = "offer";
-  private static final Optional<HostOffer> NO_OFFER = Optional.absent();
-
-  private StorageTestUtil storageUtil;
-  private SchedulingFilter schedulingFilter;
-  private FakeStatsProvider statsProvider;
-  private PreemptorMetrics preemptorMetrics;
-
-  @Before
-  public void setUp() {
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-    statsProvider = new FakeStatsProvider();
-    preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider));
-  }
-
-  private Optional<ImmutableSet<PreemptionVictim>> runFilter(
-      ScheduledTask pendingTask,
-      Optional<HostOffer> offer,
-      ScheduledTask... victims) {
-
-    PreemptionVictimFilter.PreemptionVictimFilterImpl filter =
-        new PreemptionVictimFilter.PreemptionVictimFilterImpl(
-            schedulingFilter,
-            TaskExecutors.NO_OVERHEAD_EXECUTOR,
-            preemptorMetrics);
-
-    return filter.filterPreemptionVictims(
-        ITaskConfig.build(pendingTask.getAssignedTask().getTask()),
-        preemptionVictims(victims),
-        EMPTY,
-        offer,
-        storageUtil.mutableStoreProvider);
-  }
-
-  @Test
-  public void testPreempted() throws Exception {
-    setUpHost();
-
-    schedulingFilter = createMock(SchedulingFilter.class);
-    ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
-    assignToHost(lowPriority);
-
-    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
-
-    expectFiltering();
-
-    control.replay();
-    assertVictims(runFilter(highPriority, NO_OFFER, lowPriority), lowPriority);
-  }
-
-  @Test
-  public void testLowestPriorityPreempted() throws Exception {
-    setUpHost();
-
-    schedulingFilter = createMock(SchedulingFilter.class);
-    ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10);
-    assignToHost(lowPriority);
-
-    ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 1);
-    assignToHost(lowerPriority);
-
-    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 100);
-
-    expectFiltering();
-
-    control.replay();
-    assertVictims(runFilter(highPriority, NO_OFFER, lowerPriority), lowerPriority);
-  }
-
-  @Test
-  public void testOnePreemptableTask() throws Exception {
-    setUpHost();
-
-    schedulingFilter = createMock(SchedulingFilter.class);
-    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
-    assignToHost(highPriority);
-
-    ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 99);
-    assignToHost(lowerPriority);
-
-    ScheduledTask lowestPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 1);
-    assignToHost(lowestPriority);
-
-    ScheduledTask pendingPriority = makeTask(USER_A, JOB_A, TASK_ID_D, 98);
-
-    expectFiltering();
-
-    control.replay();
-    assertVictims(
-        runFilter(pendingPriority, NO_OFFER, highPriority, lowerPriority, lowestPriority),
-        lowestPriority);
-  }
-
-  @Test
-  public void testHigherPriorityRunning() throws Exception {
-    schedulingFilter = createMock(SchedulingFilter.class);
-    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
-    assignToHost(highPriority);
-
-    ScheduledTask task = makeTask(USER_A, JOB_A, TASK_ID_A);
-
-    control.replay();
-    assertNoVictims(runFilter(task, NO_OFFER, highPriority));
-  }
-
-  @Test
-  public void testProductionPreemptingNonproduction() throws Exception {
-    setUpHost();
-
-    schedulingFilter = createMock(SchedulingFilter.class);
-    // Use a very low priority for the production task to show that priority is irrelevant.
-    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
-    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100);
-    assignToHost(a1);
-
-    expectFiltering();
-
-    control.replay();
-    assertVictims(runFilter(p1, NO_OFFER, a1), a1);
-  }
-
-  @Test
-  public void testProductionPreemptingNonproductionAcrossUsers() throws Exception {
-    setUpHost();
-
-    schedulingFilter = createMock(SchedulingFilter.class);
-    // Use a very low priority for the production task to show that priority is irrelevant.
-    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
-    ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100);
-    assignToHost(a1);
-
-    expectFiltering();
-
-    control.replay();
-    assertVictims(runFilter(p1, NO_OFFER, a1), a1);
-  }
-
-  @Test
-  public void testProductionUsersDoNotPreemptEachOther() throws Exception {
-    schedulingFilter = createMock(SchedulingFilter.class);
-    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", 1000);
-    ScheduledTask a1 = makeProductionTask(USER_B, JOB_A, TASK_ID_B + "_a1", 0);
-    assignToHost(a1);
-
-    control.replay();
-    assertNoVictims(runFilter(p1, NO_OFFER, a1));
-  }
-
-  // Ensures a production task can preempt 2 tasks on the same host.
-  @Test
-  public void testProductionPreemptingManyNonProduction() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
-    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
-    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
-    ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
-    b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
-    setUpHost();
-
-    assignToHost(a1);
-    assignToHost(b1);
-
-    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
-    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
-
-    control.replay();
-    assertVictims(runFilter(p1, NO_OFFER, a1, b1), a1, b1);
-  }
-
-  // Ensures we select the minimal number of tasks to preempt
-  @Test
-  public void testMinimalSetPreempted() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
-    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
-    a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
-
-    ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
-    b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
-    ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2");
-    b2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
-    setUpHost();
-
-    assignToHost(a1);
-    assignToHost(b1);
-    assignToHost(b2);
-
-    ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1");
-    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
-
-    control.replay();
-    assertVictims(runFilter(p1, NO_OFFER, b1, b2, a1), a1);
-  }
-
-  // Ensures a production task *never* preempts a production task from another job.
-  @Test
-  public void testProductionJobNeverPreemptsProductionJob() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
-    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
-    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
-
-    setUpHost();
-
-    assignToHost(p1);
-
-    ScheduledTask p2 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p2");
-    p2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-
-    control.replay();
-    assertNoVictims(runFilter(p2, NO_OFFER, p1));
-  }
-
-  // Ensures that we can preempt if a task + offer can satisfy a pending task.
-  @Test
-  public void testPreemptWithOfferAndTask() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
-
-    setUpHost();
-
-    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
-    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-    assignToHost(a1);
-
-    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
-    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
-
-    control.replay();
-    assertVictims(
-        runFilter(p1, makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1), a1),
-        a1);
-  }
-
-  // Ensures we can preempt if two tasks and an offer can satisfy a pending task.
-  @Test
-  public void testPreemptWithOfferAndMultipleTasks() throws Exception {
-    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
-
-    setUpHost();
-
-    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
-    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-    assignToHost(a1);
-
-    ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2");
-    a2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-    assignToHost(a2);
-
-    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
-    p1.getAssignedTask().getTask().setNumCpus(4).setRamMb(2048);
-
-    control.replay();
-    Optional<HostOffer> offer =
-        makeOffer(OFFER, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1);
-    assertVictims(runFilter(p1, offer, a1, a2), a1, a2);
-  }
-
-  @Test
-  public void testNoPreemptionVictims() {
-    schedulingFilter = createMock(SchedulingFilter.class);
-    ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
-
-    control.replay();
-
-    assertNoVictims(runFilter(task, NO_OFFER));
-  }
-
-  @Test
-  public void testMissingAttributes() {
-    schedulingFilter = createMock(SchedulingFilter.class);
-    ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
-    assignToHost(task);
-
-    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
-    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-    assignToHost(a1);
-
-    expect(storageUtil.attributeStore.getHostAttributes(HOST)).andReturn(Optional.absent());
-
-    control.replay();
-
-    assertNoVictims(runFilter(task, NO_OFFER, a1));
-    assertEquals(1L, statsProvider.getLongValue(MISSING_ATTRIBUTES_NAME));
-  }
-
-  @Test
-  public void testAllVictimsVetoed() {
-    schedulingFilter = createMock(SchedulingFilter.class);
-    ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
-    assignToHost(task);
-
-    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
-    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
-    assignToHost(a1);
-
-    setUpHost();
-    expectFiltering(Optional.of(Veto.constraintMismatch("ban")));
-
-    control.replay();
-
-    assertNoVictims(runFilter(task, NO_OFFER, a1));
-  }
-
-  private static ImmutableSet<PreemptionVictim> preemptionVictims(ScheduledTask... tasks) {
-    return FluentIterable.from(ImmutableSet.copyOf(tasks))
-        .transform(
-            new Function<ScheduledTask, PreemptionVictim>() {
-              @Override
-              public PreemptionVictim apply(ScheduledTask task) {
-                return PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask()));
-              }
-            }).toSet();
-  }
-
-  private static void assertVictims(
-      Optional<ImmutableSet<PreemptionVictim>> actual,
-      ScheduledTask... expected) {
-
-    assertEquals(Optional.of(preemptionVictims(expected)), actual);
-  }
-
-  private static void assertNoVictims(Optional<ImmutableSet<PreemptionVictim>> actual) {
-    assertEquals(Optional.<ImmutableSet<PreemptionVictim>>absent(), actual);
-  }
-
-  private Optional<HostOffer> makeOffer(
-      String offerId,
-      double cpu,
-      Amount<Long, Data> ram,
-      Amount<Long, Data> disk,
-      int numPorts) {
-
-    List<Resource> resources = new Resources(cpu, ram, disk, numPorts).toResourceList();
-    Offer.Builder builder = Offer.newBuilder();
-    builder.getIdBuilder().setValue(offerId);
-    builder.getFrameworkIdBuilder().setValue("framework-id");
-    builder.getSlaveIdBuilder().setValue(SLAVE_ID);
-    builder.setHostname(HOST);
-    for (Resource r: resources) {
-      builder.addResources(r);
-    }
-    return Optional.of(new HostOffer(
-        builder.build(),
-        IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))));
-  }
-
-  private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering() {
-    return expectFiltering(Optional.absent());
-  }
-
-  private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering(
-      final Optional<Veto> veto) {
-
-    return expect(schedulingFilter.filter(
-        EasyMock.anyObject(),
-        EasyMock.anyObject()))
-        .andAnswer(
-            new IAnswer<Set<SchedulingFilter.Veto>>() {
-              @Override
-              public Set<SchedulingFilter.Veto> answer() {
-                return veto.asSet();
-              }
-            });
-  }
-
-  static ScheduledTask makeTask(
-      String role,
-      String job,
-      String taskId,
-      int priority,
-      String env,
-      boolean production) {
-
-    AssignedTask assignedTask = new AssignedTask()
-        .setTaskId(taskId)
-        .setTask(new TaskConfig()
-            .setJob(new JobKey(role, env, job))
-            .setPriority(priority)
-            .setProduction(production)
-            .setJobName(job)
-            .setEnvironment(env)
-            .setConstraints(new HashSet<Constraint>()));
-    return new ScheduledTask().setAssignedTask(assignedTask);
-  }
-
-  static ScheduledTask makeTask(String role, String job, String taskId) {
-    return makeTask(role, job, taskId, 0, "dev", false);
-  }
-
-  static void addEvent(ScheduledTask task, ScheduleStatus status) {
-    task.addToTaskEvents(new TaskEvent(0, status));
-  }
-
-  private ScheduledTask makeProductionTask(String role, String job, String taskId) {
-    return makeTask(role, job, taskId, 0, "prod", true);
-  }
-
-  private ScheduledTask makeProductionTask(String role, String job, String taskId, int priority) {
-    return makeTask(role, job, taskId, priority, "prod", true);
-  }
-
-  private ScheduledTask makeTask(String role, String job, String taskId, int priority) {
-    return makeTask(role, job, taskId, priority, "dev", false);
-  }
-
-  private void assignToHost(ScheduledTask task) {
-    task.setStatus(RUNNING);
-    addEvent(task, RUNNING);
-    task.getAssignedTask().setSlaveHost(HOST);
-    task.getAssignedTask().setSlaveId(SLAVE_ID);
-  }
-
-  private Attribute host(String host) {
-    return new Attribute(HOST_ATTRIBUTE, ImmutableSet.of(host));
-  }
-
-  private Attribute rack(String rack) {
-    return new Attribute(RACK_ATTRIBUTE, ImmutableSet.of(rack));
-  }
-
-  // Sets up a normal host, no dedicated hosts and no maintenance.
-  private void setUpHost() {
-    IHostAttributes hostAttrs = IHostAttributes.build(
-        new HostAttributes().setHost(HOST).setSlaveId(HOST + "_id")
-            .setMode(NONE).setAttributes(ImmutableSet.of(rack(RACK), host(RACK))));
-
-    expect(storageUtil.attributeStore.getHostAttributes(HOST))
-        .andReturn(Optional.of(hostAttrs)).anyTimes();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java
deleted file mode 100644
index bb93b63..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-public class PreemptionVictimTest {
-
-  @Test
-  public void testBeanMethods() {
-    PreemptionVictim a = makeVictim("a");
-    PreemptionVictim a1 = makeVictim("a");
-    PreemptionVictim b = makeVictim("b");
-    assertEquals(a, a1);
-    assertEquals(a.hashCode(), a1.hashCode());
-    assertEquals(a.toString(), a1.toString());
-    assertNotEquals(a, b);
-    assertNotEquals(a.toString(), b.toString());
-    assertEquals(ImmutableSet.of(a, b), ImmutableSet.of(a, a1, b));
-  }
-
-  private PreemptionVictim makeVictim(String taskId) {
-    return PreemptionVictim.fromTask(IAssignedTask.build(new AssignedTask()
-        .setTaskId(taskId)
-        .setSlaveId(taskId + "slave")
-        .setSlaveHost(taskId + "host")
-        .setTask(new TaskConfig().setJob(new JobKey("role", "env", "job")))));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
deleted file mode 100644
index d36499f..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.util.Set;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.OfferManager;
-import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.state.StateChangeResult;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.mesos.Protos;
-import org.easymock.EasyMock;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotValidationStatName;
-import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.successStatName;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class PreemptorImplTest extends EasyMockTest {
-  private static final String SLAVE_ID = "slave_id";
-  private static final IScheduledTask TASK = IScheduledTask.build(makeTask());
-  private static final PreemptionProposal PROPOSAL = createPreemptionProposal(TASK);
-  private static final TaskGroupKey GROUP_KEY =
-      TaskGroupKey.from(ITaskConfig.build(makeTask().getAssignedTask().getTask()));
-
-  private static final Set<PreemptionProposal> NO_SLOTS = ImmutableSet.of();
-  private static final Optional<String> EMPTY_RESULT = Optional.absent();
-  private static final HostOffer OFFER =
-      new HostOffer(Protos.Offer.getDefaultInstance(), IHostAttributes.build(new HostAttributes()));
-
-  private StateManager stateManager;
-  private FakeStatsProvider statsProvider;
-  private PreemptionVictimFilter preemptionVictimFilter;
-  private PreemptorImpl preemptor;
-  private BiCache<PreemptionProposal, TaskGroupKey> slotCache;
-  private Storage.MutableStoreProvider storeProvider;
-
-  @Before
-  public void setUp() {
-    storeProvider = createMock(Storage.MutableStoreProvider.class);
-    stateManager = createMock(StateManager.class);
-    preemptionVictimFilter = createMock(PreemptionVictimFilter.class);
-    slotCache = createMock(new Clazz<BiCache<PreemptionProposal, TaskGroupKey>>() { });
-    statsProvider = new FakeStatsProvider();
-    OfferManager offerManager = createMock(OfferManager.class);
-    expect(offerManager.getOffer(anyObject(Protos.SlaveID.class)))
-        .andReturn(Optional.of(OFFER))
-        .anyTimes();
-
-    preemptor = new PreemptorImpl(
-        stateManager,
-        offerManager,
-        preemptionVictimFilter,
-        new PreemptorMetrics(new CachedCounters(statsProvider)),
-        slotCache);
-  }
-
-  @Test
-  public void testPreemptTasksSuccessful() throws Exception {
-    expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL));
-    slotCache.remove(PROPOSAL, GROUP_KEY);
-    expectSlotValidation(PROPOSAL, Optional.of(ImmutableSet.of(
-        PreemptionVictim.fromTask(TASK.getAssignedTask()))));
-
-    expectPreempted(TASK);
-
-    control.replay();
-
-    assertEquals(Optional.of(SLAVE_ID), callPreemptor());
-    assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true)));
-    assertEquals(1L, statsProvider.getLongValue(successStatName(true)));
-  }
-
-  @Test
-  public void testPreemptTasksValidationFailed() throws Exception {
-    expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL));
-    slotCache.remove(PROPOSAL, GROUP_KEY);
-    expectSlotValidation(PROPOSAL, Optional.absent());
-
-    control.replay();
-
-    assertEquals(EMPTY_RESULT, callPreemptor());
-    assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
-  }
-
-  @Test
-  public void testNoCachedSlot() throws Exception {
-    expect(slotCache.getByValue(GROUP_KEY)).andReturn(NO_SLOTS);
-
-    control.replay();
-
-    assertEquals(EMPTY_RESULT, callPreemptor());
-    assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
-  }
-
-  private Optional<String> callPreemptor() {
-    return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), EMPTY, storeProvider);
-  }
-
-  private void expectSlotValidation(
-      PreemptionProposal slot,
-      Optional<ImmutableSet<PreemptionVictim>> victims) {
-
-    expect(preemptionVictimFilter.filterPreemptionVictims(
-        TASK.getAssignedTask().getTask(),
-        slot.getVictims(),
-        EMPTY,
-        Optional.of(OFFER),
-        storeProvider)).andReturn(victims);
-  }
-
-  private void expectPreempted(IScheduledTask preempted) throws Exception {
-    expect(stateManager.changeState(
-        anyObject(Storage.MutableStoreProvider.class),
-        eq(Tasks.id(preempted)),
-        eq(Optional.absent()),
-        eq(ScheduleStatus.PREEMPTING),
-        EasyMock.anyObject()))
-        .andReturn(StateChangeResult.SUCCESS);
-  }
-
-  private static PreemptionProposal createPreemptionProposal(IScheduledTask task) {
-    IAssignedTask assigned = task.getAssignedTask();
-    return new PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID);
-  }
-
-  private static ScheduledTask makeTask() {
-    ScheduledTask task = new ScheduledTask()
-        .setAssignedTask(new AssignedTask()
-            .setTask(new TaskConfig()
-                .setPriority(1)
-                .setProduction(true)
-                .setJob(new JobKey("role", "env", "name"))));
-    task.addToTaskEvents(new TaskEvent(0, PENDING));
-    return task;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
deleted file mode 100644
index 2c20571..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import com.google.common.base.Optional;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Key;
-import com.google.inject.Module;
-import com.twitter.common.application.StartupStage;
-import com.twitter.common.application.modules.LifecycleModule;
-import com.twitter.common.base.ExceptionalCommand;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class PreemptorModuleTest extends EasyMockTest {
-
-  private StorageTestUtil storageUtil;
-
-  @Before
-  public void setUp() {
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-  }
-
-  private Injector createInjector(Module module) {
-    return Guice.createInjector(
-        module,
-        new LifecycleModule(),
-        new AbstractModule() {
-          private <T> void bindMock(Class<T> clazz) {
-            bind(clazz).toInstance(createMock(clazz));
-          }
-
-          @Override
-          protected void configure() {
-            bindMock(SchedulingFilter.class);
-            bindMock(StateManager.class);
-            bindMock(TaskAssigner.class);
-            bindMock(Thread.UncaughtExceptionHandler.class);
-            bind(Storage.class).toInstance(storageUtil.storage);
-          }
-        });
-  }
-
-  @Test
-  public void testPreemptorDisabled() throws Exception {
-    Injector injector = createInjector(new PreemptorModule(
-        false,
-        Amount.of(0L, Time.SECONDS),
-        Amount.of(0L, Time.SECONDS)));
-
-    control.replay();
-
-    injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
-
-    injector.getBindings();
-    assertEquals(
-        Optional.absent(),
-        injector.getInstance(Preemptor.class).attemptPreemptionFor(
-            IAssignedTask.build(new AssignedTask()),
-            AttributeAggregate.EMPTY,
-            storageUtil.mutableStoreProvider));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
index 0d9aeff..91b91bc 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
@@ -47,12 +47,12 @@ import com.twitter.thrift.ServiceInstance;
 
 import org.apache.aurora.gen.ServerInfo;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.async.OfferManager;
-import org.apache.aurora.scheduler.async.RescheduleCalculator;
-import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
-import org.apache.aurora.scheduler.async.TaskScheduler;
 import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.http.api.GsonMessageBodyHandler;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
+import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings;
+import org.apache.aurora.scheduler.scheduling.TaskScheduler;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IServerInfo;

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
index 49af15b..050a654 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
@@ -32,13 +32,13 @@ import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TaskStatusHandler;
-import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived;
+import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.stats.CachedCounters;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
new file mode 100644
index 0000000..04be32e
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Level;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.testing.TearDown;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl;
+import org.apache.aurora.scheduler.offers.OfferManager.OfferReturnDelay;
+import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.apache.mesos.Protos.TaskInfo;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class OfferManagerImplTest extends EasyMockTest {
+
+  private static final Amount<Long, Time> RETURN_DELAY = Amount.of(1L, Time.DAYS);
+  private static final String HOST_A = "HOST_A";
+  private static final HostOffer OFFER_A = new HostOffer(
+      Offers.makeOffer("OFFER_A", HOST_A),
+      IHostAttributes.build(new HostAttributes().setMode(NONE)));
+  private static final String HOST_B = "HOST_B";
+  private static final HostOffer OFFER_B = new HostOffer(
+      Offers.makeOffer("OFFER_B", HOST_B),
+      IHostAttributes.build(new HostAttributes().setMode(NONE)));
+  private static final String HOST_C = "HOST_C";
+  private static final HostOffer OFFER_C = new HostOffer(
+      Offers.makeOffer("OFFER_C", HOST_C),
+      IHostAttributes.build(new HostAttributes().setMode(NONE)));
+  private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(
+      ITaskConfig.build(new TaskConfig().setJob(new JobKey("role", "env", "name"))));
+
+  private Driver driver;
+  private FakeScheduledExecutor clock;
+  private Function<HostOffer, Assignment> offerAcceptor;
+  private OfferManagerImpl offerManager;
+
+  @Before
+  public void setUp() {
+    offerManager.LOG.setLevel(Level.FINE);
+    addTearDown(new TearDown() {
+      @Override
+      public void tearDown() throws Exception {
+        offerManager.LOG.setLevel(Level.INFO);
+      }
+    });
+    driver = createMock(Driver.class);
+    ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class);
+    clock = FakeScheduledExecutor.scheduleExecutor(executorMock);
+
+    addTearDown(new TearDown() {
+      @Override
+      public void tearDown() throws Exception {
+        clock.assertEmpty();
+      }
+    });
+    offerAcceptor = createMock(new Clazz<Function<HostOffer, Assignment>>() { });
+    OfferReturnDelay returnDelay = new OfferReturnDelay() {
+      @Override
+      public Amount<Long, Time> get() {
+        return RETURN_DELAY;
+      }
+    };
+    offerManager = new OfferManagerImpl(driver, returnDelay, executorMock);
+  }
+
+  @Test
+  public void testOffersSorted() throws Exception {
+    // Ensures that non-DRAINING offers are preferred - the DRAINING offer would be tried last.
+
+    HostOffer offerA = setMode(OFFER_A, DRAINING);
+    HostOffer offerC = setMode(OFFER_C, DRAINING);
+
+    TaskInfo task = TaskInfo.getDefaultInstance();
+    expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task));
+    driver.launchTask(OFFER_B.getOffer().getId(), task);
+
+    driver.declineOffer(offerA.getOffer().getId());
+    driver.declineOffer(offerC.getOffer().getId());
+
+    control.replay();
+
+    offerManager.addOffer(offerA);
+    offerManager.addOffer(OFFER_B);
+    offerManager.addOffer(offerC);
+    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test
+  public void testGetOffersReturnsAllOffers() throws Exception {
+    expect(offerAcceptor.apply(OFFER_A))
+        .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))));
+
+    control.replay();
+
+    offerManager.addOffer(OFFER_A);
+    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+
+    offerManager.cancelOffer(OFFER_A.getOffer().getId());
+    assertTrue(Iterables.isEmpty(offerManager.getOffers()));
+
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test
+  public void testOfferFilteringDueToStaticBan() throws Exception {
+    expect(offerAcceptor.apply(OFFER_A))
+        .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))));
+
+    TaskInfo task = TaskInfo.getDefaultInstance();
+    expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task));
+    driver.launchTask(OFFER_B.getOffer().getId(), task);
+
+    driver.declineOffer(OFFER_A.getOffer().getId());
+
+    control.replay();
+
+    offerManager.addOffer(OFFER_A);
+    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    // Run again to make sure all offers are banned (via no expectations set).
+    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+
+    // Add a new offer to accept the task previously banned for OFFER_A.
+    offerManager.addOffer(OFFER_B);
+    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test
+  public void testStaticBanIsCleared() throws Exception {
+    expect(offerAcceptor.apply(OFFER_A))
+        .andReturn(Assignment.failure(ImmutableSet.of(Veto.insufficientResources("ram", 100))));
+
+    TaskInfo task = TaskInfo.getDefaultInstance();
+    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task));
+    driver.launchTask(OFFER_A.getOffer().getId(), task);
+
+    expect(offerAcceptor.apply(OFFER_A))
+        .andReturn(Assignment.failure(ImmutableSet.of(Veto.maintenance("draining"))));
+
+    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task));
+    driver.launchTask(OFFER_A.getOffer().getId(), task);
+
+    driver.declineOffer(OFFER_A.getOffer().getId());
+
+    control.replay();
+
+    offerManager.addOffer(OFFER_A);
+    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+
+    // Make sure the static ban is cleared when the offers are returned.
+    clock.advance(RETURN_DELAY);
+    offerManager.addOffer(OFFER_A);
+    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+
+    offerManager.addOffer(OFFER_A);
+    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+
+    // Make sure the static ban is cleared when driver is disconnected.
+    offerManager.driverDisconnected(new DriverDisconnected());
+    offerManager.addOffer(OFFER_A);
+    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test
+  public void testFlushOffers() throws Exception {
+    control.replay();
+
+    offerManager.addOffer(OFFER_A);
+    offerManager.addOffer(OFFER_B);
+    offerManager.driverDisconnected(new DriverDisconnected());
+    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test
+  public void testDeclineOffer() throws Exception {
+    driver.declineOffer(OFFER_A.getOffer().getId());
+
+    control.replay();
+
+    offerManager.addOffer(OFFER_A);
+    clock.advance(RETURN_DELAY);
+  }
+
+  private static HostOffer setMode(HostOffer offer, MaintenanceMode mode) {
+    return new HostOffer(
+        offer.getOffer(),
+        IHostAttributes.build(offer.getAttributes().newBuilder().setMode(mode)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/offers/Offers.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/Offers.java b/src/test/java/org/apache/aurora/scheduler/offers/Offers.java
new file mode 100644
index 0000000..c0899b0
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/offers/Offers.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.SlaveID;
+
+/**
+ * Utility class for creating resource offers in unit tests.
+ */
+public final class Offers {
+  private Offers() {
+    // Utility class.
+  }
+
+  public static final String DEFAULT_HOST = "hostname";
+
+  public static Offer makeOffer(String offerId) {
+    return Offers.makeOffer(offerId, DEFAULT_HOST);
+  }
+
+  public static Offer makeOffer(String offerId, String hostName) {
+    return Offer.newBuilder()
+        .setId(OfferID.newBuilder().setValue(offerId))
+        .setFrameworkId(FrameworkID.newBuilder().setValue("framework_id"))
+        .setSlaveId(SlaveID.newBuilder().setValue("slave_id-" + offerId))
+        .setHostname(hostName)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelayTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelayTest.java b/src/test/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelayTest.java
new file mode 100644
index 0000000..be21cd6
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelayTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Random;
+
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class RandomJitterReturnDelayTest extends EasyMockTest {
+  private void assertRandomJitterReturnDelay(
+      int minHoldTimeMs,
+      int jitterWindowMs,
+      boolean shouldThrow) {
+
+    int randomValue = 123;
+
+    Random mockRandom = control.createMock(Random.class);
+
+    if (!shouldThrow) {
+      expect(mockRandom.nextInt(jitterWindowMs)).andReturn(randomValue);
+    }
+
+    control.replay();
+
+    assertEquals(
+        minHoldTimeMs + randomValue,
+        new RandomJitterReturnDelay(
+            minHoldTimeMs,
+            jitterWindowMs,
+            mockRandom).get().getValue().intValue());
+  }
+
+  @Test
+  public void testRandomJitterReturnDelay() throws Exception {
+    assertRandomJitterReturnDelay(100, 200, false);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNegativeHoldTimeThrowsIllegalArgumentException() throws Exception {
+    assertRandomJitterReturnDelay(-1, 200, true);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNegativeWindowThrowsIllegalArgumentException() throws Exception {
+    assertRandomJitterReturnDelay(100, -1, true);
+  }
+
+  @Test
+  public void testZeroHoldTime() throws Exception {
+    assertRandomJitterReturnDelay(0, 200, false);
+  }
+
+  @Test
+  public void testZeroWindow() throws Exception {
+    assertRandomJitterReturnDelay(100, 0, false);
+  }
+
+  @Test
+  public void testZeroHoldTimeZeroWindow() throws Exception {
+    assertRandomJitterReturnDelay(0, 0, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java
new file mode 100644
index 0000000..7312091
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.scheduler.preemptor.BiCache.BiCacheSettings;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class BiCacheTest {
+  private static final Amount<Long, Time> HOLD_DURATION = Amount.of(1L, Time.MINUTES);
+  private static final String STAT_NAME = "cache_size_stat";
+  private static final String KEY_1 = "Key 1";
+  private static final String KEY_2 = "Key 2";
+  private static final Optional<Integer> NO_VALUE = Optional.absent();
+
+  private FakeStatsProvider statsProvider;
+  private FakeClock clock;
+  private BiCache<String, Integer> biCache;
+
+  @Before
+  public void setUp() {
+    statsProvider = new FakeStatsProvider();
+    clock = new FakeClock();
+    biCache = new BiCache<>(statsProvider, new BiCacheSettings(HOLD_DURATION, STAT_NAME), clock);
+  }
+
+  @Test
+  public void testExpiration() {
+    biCache.put(KEY_1, 1);
+    assertEquals(Optional.of(1), biCache.get(KEY_1));
+    assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
+
+    clock.advance(HOLD_DURATION);
+
+    assertEquals(NO_VALUE, biCache.get(KEY_1));
+    assertEquals(ImmutableSet.of(), biCache.getByValue(1));
+    assertEquals(0L, statsProvider.getLongValue(STAT_NAME));
+  }
+
+  @Test
+  public void testRemoval() {
+    biCache.put(KEY_1, 1);
+    assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
+    assertEquals(Optional.of(1), biCache.get(KEY_1));
+    biCache.remove(KEY_1, 1);
+    assertEquals(NO_VALUE, biCache.get(KEY_1));
+    assertEquals(0L, statsProvider.getLongValue(STAT_NAME));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testRemovalWithNullKey() {
+    biCache.remove(null, 1);
+  }
+
+  @Test
+  public void testDifferentKeysIdenticalValues() {
+    biCache.put(KEY_1, 1);
+    biCache.put(KEY_2, 1);
+    assertEquals(2L, statsProvider.getLongValue(STAT_NAME));
+
+    assertEquals(Optional.of(1), biCache.get(KEY_1));
+    assertEquals(Optional.of(1), biCache.get(KEY_2));
+    assertEquals(ImmutableSet.of(KEY_1, KEY_2), biCache.getByValue(1));
+
+    biCache.remove(KEY_1, 1);
+    assertEquals(NO_VALUE, biCache.get(KEY_1));
+    assertEquals(Optional.of(1), biCache.get(KEY_2));
+    assertEquals(ImmutableSet.of(KEY_2), biCache.getByValue(1));
+    assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
+
+    clock.advance(HOLD_DURATION);
+    assertEquals(NO_VALUE, biCache.get(KEY_1));
+    assertEquals(NO_VALUE, biCache.get(KEY_2));
+    assertEquals(ImmutableSet.of(), biCache.getByValue(1));
+    assertEquals(0L, statsProvider.getLongValue(STAT_NAME));
+  }
+
+  @Test
+  public void testIdenticalKeysDifferentValues() {
+    biCache.put(KEY_1, 1);
+    biCache.put(KEY_1, 2);
+    assertEquals(Optional.of(2), biCache.get(KEY_1));
+    assertEquals(ImmutableSet.of(), biCache.getByValue(1));
+    assertEquals(ImmutableSet.of(KEY_1), biCache.getByValue(2));
+    assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
+  }
+}


Mime
View raw message