aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [2/8] aurora git commit: Break apart async package and AsyncModule into purpose-specific equivalents.
Date Wed, 22 Jul 2015 19:39:59 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/ClusterStateImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/ClusterStateImplTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/ClusterStateImplTest.java
new file mode 100644
index 0000000..a1ac922
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/ClusterStateImplTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMultimap;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.junit.Assert.assertEquals;
+
+public class ClusterStateImplTest {
+
+  private ClusterStateImpl state;
+
+  @Before
+  public void setUp() {
+    state = new ClusterStateImpl();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testImmutable() {
+    state.getSlavesToActiveTasks().clear();
+  }
+
+  @Test
+  public void testTaskLifecycle() {
+    IAssignedTask a = makeTask("a", "s1");
+
+    assertVictims();
+    changeState(a, THROTTLED);
+    assertVictims();
+    changeState(a, PENDING);
+    assertVictims();
+    changeState(a, ASSIGNED);
+    assertVictims(a);
+    changeState(a, RUNNING);
+    assertVictims(a);
+    changeState(a, KILLING);
+    assertVictims(a);
+    changeState(a, FINISHED);
+    assertVictims();
+  }
+
+  @Test
+  public void testTaskChangesSlaves() {
+    // We do not intend to handle the case of an external failure leading to the same task ID
+    // on a different slave.
+    IAssignedTask a = makeTask("a", "s1");
+    IAssignedTask a1 = makeTask("a", "s2");
+    changeState(a, RUNNING);
+    changeState(a1, RUNNING);
+    assertVictims(a, a1);
+  }
+
+  @Test
+  public void testMultipleTasks() {
+    IAssignedTask a = makeTask("a", "s1");
+    IAssignedTask b = makeTask("b", "s1");
+    IAssignedTask c = makeTask("c", "s2");
+    IAssignedTask d = makeTask("d", "s3");
+    IAssignedTask e = makeTask("e", "s3");
+    IAssignedTask f = makeTask("f", "s1");
+    changeState(a, RUNNING);
+    assertVictims(a);
+    changeState(b, RUNNING);
+    assertVictims(a, b);
+    changeState(c, RUNNING);
+    assertVictims(a, b, c);
+    changeState(d, RUNNING);
+    assertVictims(a, b, c, d);
+    changeState(e, RUNNING);
+    assertVictims(a, b, c, d, e);
+    changeState(c, FINISHED);
+    assertVictims(a, b, d, e);
+    changeState(a, FAILED);
+    changeState(e, KILLED);
+    assertVictims(b, d);
+    changeState(f, RUNNING);
+    assertVictims(b, d, f);
+  }
+
+  private void assertVictims(IAssignedTask... tasks) {
+    ImmutableMultimap.Builder<String, PreemptionVictim> victims = ImmutableMultimap.builder();
+    for (IAssignedTask task : tasks) {
+      victims.put(task.getSlaveId(), PreemptionVictim.fromTask(task));
+    }
+    assertEquals(HashMultimap.create(victims.build()), state.getSlavesToActiveTasks());
+  }
+
+  private IAssignedTask makeTask(String taskId, String slaveId) {
+    return IAssignedTask.build(new AssignedTask()
+        .setTaskId(taskId)
+        .setSlaveId(slaveId)
+        .setSlaveHost(slaveId + "host")
+        .setTask(new TaskConfig().setJob(new JobKey("role", "env", "job"))));
+  }
+
+  private void changeState(IAssignedTask assignedTask, ScheduleStatus status) {
+    IScheduledTask task = IScheduledTask.build(new ScheduledTask()
+        .setStatus(status)
+        .setAssignedTask(assignedTask.newBuilder()));
+    state.taskChangedState(TaskStateChange.transition(task, ScheduleStatus.INIT));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
new file mode 100644
index 0000000..b9cb5bf
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import java.util.Arrays;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.Protos;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.TASK_PROCESSOR_RUN_NAME;
+import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.attemptsStatName;
+import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotSearchStatName;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+
+public class PendingTaskProcessorTest extends EasyMockTest {
+  private static final String CACHE_STAT = "cache_size";
+  private static final String SLAVE_ID_1 = "slave_id_1";
+  private static final String SLAVE_ID_2 = "slave_id_2";
+  private static final JobKey JOB_A = new JobKey("role_a", "env", "job_a");
+  private static final JobKey JOB_B = new JobKey("role_b", "env", "job_b");
+  private static final IScheduledTask TASK_A = makeTask(JOB_A, SLAVE_ID_1, "id1");
+  private static final IScheduledTask TASK_B = makeTask(JOB_B, SLAVE_ID_2, "id2");
+  private static final PreemptionProposal SLOT_A = createPreemptionProposal(TASK_A, SLAVE_ID_1);
+  private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS);
+  private static final Amount<Long, Time> EXPIRATION = Amount.of(10L, Time.MINUTES);
+
+  private StorageTestUtil storageUtil;
+  private OfferManager offerManager;
+  private FakeStatsProvider statsProvider;
+  private PreemptionVictimFilter preemptionVictimFilter;
+  private PendingTaskProcessor slotFinder;
+  private BiCache<PreemptionProposal, TaskGroupKey> slotCache;
+  private ClusterState clusterState;
+  private FakeClock clock;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    offerManager = createMock(OfferManager.class);
+    preemptionVictimFilter = createMock(PreemptionVictimFilter.class);
+    statsProvider = new FakeStatsProvider();
+    clusterState = createMock(ClusterState.class);
+    clock = new FakeClock();
+    slotCache = new BiCache<>(
+        statsProvider,
+        new BiCache.BiCacheSettings(EXPIRATION, CACHE_STAT),
+        clock);
+
+    slotFinder = new PendingTaskProcessor(
+        storageUtil.storage,
+        offerManager,
+        preemptionVictimFilter,
+        new PreemptorMetrics(new CachedCounters(statsProvider)),
+        PREEMPTION_DELAY,
+        slotCache,
+        clusterState,
+        clock);
+  }
+
+  @Test
+  public void testSearchSlotSuccessful() throws Exception {
+    expectGetPendingTasks(TASK_A, TASK_B);
+    expectGetClusterState(TASK_A, TASK_B);
+    HostOffer offer1 = makeOffer(SLAVE_ID_1);
+    HostOffer offer2 = makeOffer(SLAVE_ID_2);
+    expectOffers(offer1, offer2);
+    expectSlotSearch(TASK_A.getAssignedTask().getTask(), TASK_A);
+    expectSlotSearch(TASK_B.getAssignedTask().getTask(), TASK_B);
+
+    control.replay();
+
+    clock.advance(PREEMPTION_DELAY);
+
+    slotFinder.run();
+    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
+    assertEquals(2L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true)));
+    assertEquals(2L, statsProvider.getLongValue(CACHE_STAT));
+  }
+
+  @Test
+  public void testSearchSlotFailed() throws Exception {
+    expectGetPendingTasks(TASK_A);
+    expectGetClusterState(TASK_A);
+    HostOffer offer1 = makeOffer(SLAVE_ID_1);
+    expectOffers(offer1);
+    expectSlotSearch(TASK_A.getAssignedTask().getTask());
+
+    control.replay();
+
+    clock.advance(PREEMPTION_DELAY);
+
+    slotFinder.run();
+    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true)));
+    assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, true)));
+  }
+
+  @Test
+  public void testHasCachedSlots() throws Exception {
+    slotCache.put(SLOT_A, group(TASK_A));
+    expectGetPendingTasks(TASK_A);
+    expectGetClusterState(TASK_A);
+    HostOffer offer1 = makeOffer(SLAVE_ID_1);
+    expectOffers(offer1);
+
+    control.replay();
+
+    clock.advance(PREEMPTION_DELAY);
+
+    slotFinder.run();
+    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true)));
+  }
+
+  @Test
+  public void testMultipleTaskGroups() throws Exception {
+    IScheduledTask task1 = makeTask(JOB_A, "1");
+    IScheduledTask task2 = makeTask(JOB_A, "2");
+    IScheduledTask task3 = makeTask(JOB_A, "3");
+    IScheduledTask task4 = makeTask(JOB_B, "4");
+    IScheduledTask task5 = makeTask(JOB_B, "5");
+
+    expectGetPendingTasks(task1, task4, task2, task5, task3);
+    expectGetClusterState(TASK_A, TASK_B);
+
+    HostOffer offer1 = makeOffer(SLAVE_ID_1);
+    HostOffer offer2 = makeOffer(SLAVE_ID_2);
+    expectOffers(offer1, offer2);
+    expectSlotSearch(task1.getAssignedTask().getTask());
+    expectSlotSearch(task4.getAssignedTask().getTask(), TASK_B);
+    PreemptionProposal proposal1 = createPreemptionProposal(TASK_B, SLAVE_ID_1);
+    PreemptionProposal proposal2 = createPreemptionProposal(TASK_B, SLAVE_ID_2);
+
+    control.replay();
+
+    clock.advance(PREEMPTION_DELAY);
+
+    slotFinder.run();
+    assertEquals(slotCache.get(proposal1), Optional.of(group(task4)));
+    assertEquals(slotCache.get(proposal2), Optional.of(group(task5)));
+    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
+    assertEquals(3L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true)));
+    assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(false, true)));
+    assertEquals(2L, statsProvider.getLongValue(CACHE_STAT));
+  }
+
+  @Test
+  public void testNoVictims() throws Exception {
+    expectGetClusterState();
+    control.replay();
+
+    clock.advance(PREEMPTION_DELAY);
+
+    slotFinder.run();
+    assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true)));
+  }
+
+  private Multimap<String, PreemptionVictim> getVictims(IScheduledTask... tasks) {
+    return Multimaps.transformValues(
+        Multimaps.index(Arrays.asList(tasks), task -> task.getAssignedTask().getSlaveId()),
+        task -> PreemptionVictim.fromTask(task.getAssignedTask())
+    );
+  }
+
+  private HostOffer makeOffer(String slaveId) {
+    Protos.Offer.Builder builder = Protos.Offer.newBuilder();
+    builder.getIdBuilder().setValue("id");
+    builder.getFrameworkIdBuilder().setValue("framework-id");
+    builder.getSlaveIdBuilder().setValue(slaveId);
+    builder.setHostname(slaveId);
+    return new HostOffer(
+        builder.build(),
+        IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
+  }
+
+  private void expectOffers(HostOffer... offers) {
+    expect(offerManager.getOffers()).andReturn(ImmutableSet.copyOf(offers));
+  }
+
+  private void expectGetClusterState(IScheduledTask... returnedTasks) {
+    expect(clusterState.getSlavesToActiveTasks()).andReturn(getVictims(returnedTasks));
+  }
+
+  private void expectSlotSearch(ITaskConfig config, IScheduledTask... victims) {
+    expect(preemptionVictimFilter.filterPreemptionVictims(
+        eq(config),
+        EasyMock.anyObject(),
+        anyObject(AttributeAggregate.class),
+        EasyMock.anyObject(),
+        eq(storageUtil.storeProvider)));
+    expectLastCall().andReturn(
+        victims.length == 0
+            ? Optional.absent()
+            : Optional.of(ImmutableSet.copyOf(getVictims(victims).values())))
+        .anyTimes();
+  }
+
+  private static PreemptionProposal createPreemptionProposal(IScheduledTask task, String slaveId) {
+    return new PreemptionProposal(
+        ImmutableSet.of(PreemptionVictim.fromTask(task.getAssignedTask())),
+        slaveId);
+  }
+
+  private static IScheduledTask makeTask(JobKey key, String taskId) {
+    return makeTask(key, null, taskId);
+  }
+
+  private static TaskGroupKey group(IScheduledTask task) {
+    return TaskGroupKey.from(task.getAssignedTask().getTask());
+  }
+
+  private static IScheduledTask makeTask(JobKey key, @Nullable String slaveId, String taskId) {
+    ScheduledTask task = new ScheduledTask()
+        .setAssignedTask(new AssignedTask()
+            .setSlaveId(slaveId)
+            .setTaskId(taskId)
+            .setTask(new TaskConfig()
+                .setPriority(1)
+                .setProduction(true)
+                .setJob(key)));
+    task.addToTaskEvents(new TaskEvent(0, PENDING));
+    return IScheduledTask.build(task);
+  }
+
+  private void expectGetPendingTasks(IScheduledTask... returnedTasks) {
+    storageUtil.expectTaskFetch(Query.statusScoped(PENDING), ImmutableSet.copyOf(returnedTasks));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
new file mode 100644
index 0000000..997d326
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.Constraint;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
+import org.apache.aurora.scheduler.mesos.TaskExecutors;
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME;
+import static org.apache.mesos.Protos.Offer;
+import static org.apache.mesos.Protos.Resource;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class PreemptionVictimFilterTest extends EasyMockTest {
+  private static final String USER_A = "user_a";
+  private static final String USER_B = "user_b";
+  private static final String USER_C = "user_c";
+  private static final String JOB_A = "job_a";
+  private static final String JOB_B = "job_b";
+  private static final String JOB_C = "job_c";
+  private static final String TASK_ID_A = "task_a";
+  private static final String TASK_ID_B = "task_b";
+  private static final String TASK_ID_C = "task_c";
+  private static final String TASK_ID_D = "task_d";
+  private static final String HOST = "host";
+  private static final String RACK = "rack";
+  private static final String SLAVE_ID = HOST + "_id";
+  private static final String RACK_ATTRIBUTE = "rack";
+  private static final String HOST_ATTRIBUTE = "host";
+  private static final String OFFER = "offer";
+  private static final Optional<HostOffer> NO_OFFER = Optional.absent();
+
+  private StorageTestUtil storageUtil;
+  private SchedulingFilter schedulingFilter;
+  private FakeStatsProvider statsProvider;
+  private PreemptorMetrics preemptorMetrics;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    statsProvider = new FakeStatsProvider();
+    preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider));
+  }
+
+  private Optional<ImmutableSet<PreemptionVictim>> runFilter(
+      ScheduledTask pendingTask,
+      Optional<HostOffer> offer,
+      ScheduledTask... victims) {
+
+    PreemptionVictimFilter.PreemptionVictimFilterImpl filter =
+        new PreemptionVictimFilter.PreemptionVictimFilterImpl(
+            schedulingFilter,
+            TaskExecutors.NO_OVERHEAD_EXECUTOR,
+            preemptorMetrics);
+
+    return filter.filterPreemptionVictims(
+        ITaskConfig.build(pendingTask.getAssignedTask().getTask()),
+        preemptionVictims(victims),
+        EMPTY,
+        offer,
+        storageUtil.mutableStoreProvider);
+  }
+
+  @Test
+  public void testPreempted() throws Exception {
+    setUpHost();
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
+    assignToHost(lowPriority);
+
+    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
+
+    expectFiltering();
+
+    control.replay();
+    assertVictims(runFilter(highPriority, NO_OFFER, lowPriority), lowPriority);
+  }
+
+  @Test
+  public void testLowestPriorityPreempted() throws Exception {
+    setUpHost();
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10);
+    assignToHost(lowPriority);
+
+    ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 1);
+    assignToHost(lowerPriority);
+
+    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 100);
+
+    expectFiltering();
+
+    control.replay();
+    assertVictims(runFilter(highPriority, NO_OFFER, lowerPriority), lowerPriority);
+  }
+
+  @Test
+  public void testOnePreemptableTask() throws Exception {
+    setUpHost();
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
+    assignToHost(highPriority);
+
+    ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 99);
+    assignToHost(lowerPriority);
+
+    ScheduledTask lowestPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 1);
+    assignToHost(lowestPriority);
+
+    ScheduledTask pendingPriority = makeTask(USER_A, JOB_A, TASK_ID_D, 98);
+
+    expectFiltering();
+
+    control.replay();
+    assertVictims(
+        runFilter(pendingPriority, NO_OFFER, highPriority, lowerPriority, lowestPriority),
+        lowestPriority);
+  }
+
+  @Test
+  public void testHigherPriorityRunning() throws Exception {
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100);
+    assignToHost(highPriority);
+
+    ScheduledTask task = makeTask(USER_A, JOB_A, TASK_ID_A);
+
+    control.replay();
+    assertNoVictims(runFilter(task, NO_OFFER, highPriority));
+  }
+
+  @Test
+  public void testProductionPreemptingNonproduction() throws Exception {
+    setUpHost();
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    // Use a very low priority for the production task to show that priority is irrelevant.
+    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100);
+    assignToHost(a1);
+
+    expectFiltering();
+
+    control.replay();
+    assertVictims(runFilter(p1, NO_OFFER, a1), a1);
+  }
+
+  @Test
+  public void testProductionPreemptingNonproductionAcrossUsers() throws Exception {
+    setUpHost();
+
+    schedulingFilter = createMock(SchedulingFilter.class);
+    // Use a very low priority for the production task to show that priority is irrelevant.
+    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
+    ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100);
+    assignToHost(a1);
+
+    expectFiltering();
+
+    control.replay();
+    assertVictims(runFilter(p1, NO_OFFER, a1), a1);
+  }
+
+  @Test
+  public void testProductionUsersDoNotPreemptEachOther() throws Exception {
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", 1000);
+    ScheduledTask a1 = makeProductionTask(USER_B, JOB_A, TASK_ID_B + "_a1", 0);
+    assignToHost(a1);
+
+    control.replay();
+    assertNoVictims(runFilter(p1, NO_OFFER, a1));
+  }
+
+  // Ensures a production task can preempt 2 tasks on the same host.
+  @Test
+  public void testProductionPreemptingManyNonProduction() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
+    b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    setUpHost();
+
+    assignToHost(a1);
+    assignToHost(b1);
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    control.replay();
+    assertVictims(runFilter(p1, NO_OFFER, a1, b1), a1, b1);
+  }
+
+  // Ensures we select the minimal number of tasks to preempt
+  @Test
+  public void testMinimalSetPreempted() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
+
+    ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1");
+    b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2");
+    b2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    setUpHost();
+
+    assignToHost(a1);
+    assignToHost(b1);
+    assignToHost(b2);
+
+    ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    control.replay();
+    assertVictims(runFilter(p1, NO_OFFER, b1, b2, a1), a1);
+  }
+
+  // Ensures a production task *never* preempts a production task from another job.
+  @Test
+  public void testProductionJobNeverPreemptsProductionJob() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+    ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    setUpHost();
+
+    assignToHost(p1);
+
+    ScheduledTask p2 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p2");
+    p2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+
+    control.replay();
+    assertNoVictims(runFilter(p2, NO_OFFER, p1));
+  }
+
+  // Ensures that we can preempt if a task + offer can satisfy a pending task.
+  @Test
+  public void testPreemptWithOfferAndTask() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+
+    setUpHost();
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a1);
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
+
+    control.replay();
+    assertVictims(
+        runFilter(p1, makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1), a1),
+        a1);
+  }
+
+  // Ensures we can preempt if two tasks and an offer can satisfy a pending task.
+  @Test
+  public void testPreemptWithOfferAndMultipleTasks() throws Exception {
+    schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
+
+    setUpHost();
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a1);
+
+    ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2");
+    a2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a2);
+
+    ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1");
+    p1.getAssignedTask().getTask().setNumCpus(4).setRamMb(2048);
+
+    control.replay();
+    Optional<HostOffer> offer =
+        makeOffer(OFFER, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1);
+    assertVictims(runFilter(p1, offer, a1, a2), a1, a2);
+  }
+
+  @Test
+  public void testNoPreemptionVictims() {
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
+
+    control.replay();
+
+    assertNoVictims(runFilter(task, NO_OFFER));
+  }
+
+  @Test
+  public void testMissingAttributes() {
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
+    assignToHost(task);
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a1);
+
+    expect(storageUtil.attributeStore.getHostAttributes(HOST)).andReturn(Optional.absent());
+
+    control.replay();
+
+    assertNoVictims(runFilter(task, NO_OFFER, a1));
+    assertEquals(1L, statsProvider.getLongValue(MISSING_ATTRIBUTES_NAME));
+  }
+
+  @Test
+  public void testAllVictimsVetoed() {
+    schedulingFilter = createMock(SchedulingFilter.class);
+    ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A);
+    assignToHost(task);
+
+    ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
+    a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
+    assignToHost(a1);
+
+    setUpHost();
+    expectFiltering(Optional.of(Veto.constraintMismatch("ban")));
+
+    control.replay();
+
+    assertNoVictims(runFilter(task, NO_OFFER, a1));
+  }
+
+  private static ImmutableSet<PreemptionVictim> preemptionVictims(ScheduledTask... tasks) {
+    return FluentIterable.from(ImmutableSet.copyOf(tasks))
+        .transform(
+            new Function<ScheduledTask, PreemptionVictim>() {
+              @Override
+              public PreemptionVictim apply(ScheduledTask task) {
+                return PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask()));
+              }
+            }).toSet();
+  }
+
+  private static void assertVictims(
+      Optional<ImmutableSet<PreemptionVictim>> actual,
+      ScheduledTask... expected) {
+
+    assertEquals(Optional.of(preemptionVictims(expected)), actual);
+  }
+
+  private static void assertNoVictims(Optional<ImmutableSet<PreemptionVictim>> actual) {
+    assertEquals(Optional.<ImmutableSet<PreemptionVictim>>absent(), actual);
+  }
+
+  private Optional<HostOffer> makeOffer(
+      String offerId,
+      double cpu,
+      Amount<Long, Data> ram,
+      Amount<Long, Data> disk,
+      int numPorts) {
+
+    List<Resource> resources = new Resources(cpu, ram, disk, numPorts).toResourceList();
+    Offer.Builder builder = Offer.newBuilder();
+    builder.getIdBuilder().setValue(offerId);
+    builder.getFrameworkIdBuilder().setValue("framework-id");
+    builder.getSlaveIdBuilder().setValue(SLAVE_ID);
+    builder.setHostname(HOST);
+    for (Resource r: resources) {
+      builder.addResources(r);
+    }
+    return Optional.of(new HostOffer(
+        builder.build(),
+        IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))));
+  }
+
+  private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering() {
+    return expectFiltering(Optional.absent());
+  }
+
+  private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering(
+      final Optional<Veto> veto) {
+
+    return expect(schedulingFilter.filter(
+        EasyMock.anyObject(),
+        EasyMock.anyObject()))
+        .andAnswer(
+            new IAnswer<Set<SchedulingFilter.Veto>>() {
+              @Override
+              public Set<SchedulingFilter.Veto> answer() {
+                return veto.asSet();
+              }
+            });
+  }
+
+  static ScheduledTask makeTask(
+      String role,
+      String job,
+      String taskId,
+      int priority,
+      String env,
+      boolean production) {
+
+    AssignedTask assignedTask = new AssignedTask()
+        .setTaskId(taskId)
+        .setTask(new TaskConfig()
+            .setJob(new JobKey(role, env, job))
+            .setPriority(priority)
+            .setProduction(production)
+            .setJobName(job)
+            .setEnvironment(env)
+            .setConstraints(new HashSet<Constraint>()));
+    return new ScheduledTask().setAssignedTask(assignedTask);
+  }
+
+  static ScheduledTask makeTask(String role, String job, String taskId) {
+    return makeTask(role, job, taskId, 0, "dev", false);
+  }
+
+  static void addEvent(ScheduledTask task, ScheduleStatus status) {
+    task.addToTaskEvents(new TaskEvent(0, status));
+  }
+
+  private ScheduledTask makeProductionTask(String role, String job, String taskId) {
+    return makeTask(role, job, taskId, 0, "prod", true);
+  }
+
+  private ScheduledTask makeProductionTask(String role, String job, String taskId, int priority) {
+    return makeTask(role, job, taskId, priority, "prod", true);
+  }
+
+  private ScheduledTask makeTask(String role, String job, String taskId, int priority) {
+    return makeTask(role, job, taskId, priority, "dev", false);
+  }
+
+  private void assignToHost(ScheduledTask task) {
+    task.setStatus(RUNNING);
+    addEvent(task, RUNNING);
+    task.getAssignedTask().setSlaveHost(HOST);
+    task.getAssignedTask().setSlaveId(SLAVE_ID);
+  }
+
+  private Attribute host(String host) {
+    return new Attribute(HOST_ATTRIBUTE, ImmutableSet.of(host));
+  }
+
+  private Attribute rack(String rack) {
+    return new Attribute(RACK_ATTRIBUTE, ImmutableSet.of(rack));
+  }
+
+  // Sets up a normal host, no dedicated hosts and no maintenance.
+  private void setUpHost() {
+    IHostAttributes hostAttrs = IHostAttributes.build(
+        new HostAttributes().setHost(HOST).setSlaveId(HOST + "_id")
+            .setMode(NONE).setAttributes(ImmutableSet.of(rack(RACK), host(RACK))));
+
+    expect(storageUtil.attributeStore.getHostAttributes(HOST))
+        .andReturn(Optional.of(hostAttrs)).anyTimes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimTest.java
new file mode 100644
index 0000000..09380f9
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class PreemptionVictimTest {
+
+  @Test
+  public void testBeanMethods() {
+    PreemptionVictim a = makeVictim("a");
+    PreemptionVictim a1 = makeVictim("a");
+    PreemptionVictim b = makeVictim("b");
+    assertEquals(a, a1);
+    assertEquals(a.hashCode(), a1.hashCode());
+    assertEquals(a.toString(), a1.toString());
+    assertNotEquals(a, b);
+    assertNotEquals(a.toString(), b.toString());
+    assertEquals(ImmutableSet.of(a, b), ImmutableSet.of(a, a1, b));
+  }
+
+  private PreemptionVictim makeVictim(String taskId) {
+    return PreemptionVictim.fromTask(IAssignedTask.build(new AssignedTask()
+        .setTaskId(taskId)
+        .setSlaveId(taskId + "slave")
+        .setSlaveHost(taskId + "host")
+        .setTask(new TaskConfig().setJob(new JobKey("role", "env", "job")))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
new file mode 100644
index 0000000..b07ff7b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.preemptor.Preemptor.PreemptorImpl;
+import org.apache.aurora.scheduler.state.StateChangeResult;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.Protos;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
+import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotValidationStatName;
+import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.successStatName;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class PreemptorImplTest extends EasyMockTest {
+  private static final String SLAVE_ID = "slave_id";
+  private static final IScheduledTask TASK = IScheduledTask.build(makeTask());
+  private static final PreemptionProposal PROPOSAL = createPreemptionProposal(TASK);
+  private static final TaskGroupKey GROUP_KEY =
+      TaskGroupKey.from(ITaskConfig.build(makeTask().getAssignedTask().getTask()));
+
+  private static final Set<PreemptionProposal> NO_SLOTS = ImmutableSet.of();
+  private static final Optional<String> EMPTY_RESULT = Optional.absent();
+  private static final HostOffer OFFER =
+      new HostOffer(Protos.Offer.getDefaultInstance(), IHostAttributes.build(new HostAttributes()));
+
+  private StateManager stateManager;
+  private FakeStatsProvider statsProvider;
+  private PreemptionVictimFilter preemptionVictimFilter;
+  private PreemptorImpl preemptor;
+  private BiCache<PreemptionProposal, TaskGroupKey> slotCache;
+  private Storage.MutableStoreProvider storeProvider;
+
+  @Before
+  public void setUp() {
+    storeProvider = createMock(Storage.MutableStoreProvider.class);
+    stateManager = createMock(StateManager.class);
+    preemptionVictimFilter = createMock(PreemptionVictimFilter.class);
+    slotCache = createMock(new Clazz<BiCache<PreemptionProposal, TaskGroupKey>>() { });
+    statsProvider = new FakeStatsProvider();
+    OfferManager offerManager = createMock(OfferManager.class);
+    expect(offerManager.getOffer(anyObject(Protos.SlaveID.class)))
+        .andReturn(Optional.of(OFFER))
+        .anyTimes();
+
+    preemptor = new PreemptorImpl(
+        stateManager,
+        offerManager,
+        preemptionVictimFilter,
+        new PreemptorMetrics(new CachedCounters(statsProvider)),
+        slotCache);
+  }
+
+  @Test
+  public void testPreemptTasksSuccessful() throws Exception {
+    expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL));
+    slotCache.remove(PROPOSAL, GROUP_KEY);
+    expectSlotValidation(PROPOSAL, Optional.of(ImmutableSet.of(
+        PreemptionVictim.fromTask(TASK.getAssignedTask()))));
+
+    expectPreempted(TASK);
+
+    control.replay();
+
+    assertEquals(Optional.of(SLAVE_ID), callPreemptor());
+    assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(successStatName(true)));
+  }
+
+  @Test
+  public void testPreemptTasksValidationFailed() throws Exception {
+    expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL));
+    slotCache.remove(PROPOSAL, GROUP_KEY);
+    expectSlotValidation(PROPOSAL, Optional.absent());
+
+    control.replay();
+
+    assertEquals(EMPTY_RESULT, callPreemptor());
+    assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+  }
+
+  @Test
+  public void testNoCachedSlot() throws Exception {
+    expect(slotCache.getByValue(GROUP_KEY)).andReturn(NO_SLOTS);
+
+    control.replay();
+
+    assertEquals(EMPTY_RESULT, callPreemptor());
+    assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+  }
+
+  private Optional<String> callPreemptor() {
+    return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), EMPTY, storeProvider);
+  }
+
+  private void expectSlotValidation(
+      PreemptionProposal slot,
+      Optional<ImmutableSet<PreemptionVictim>> victims) {
+
+    expect(preemptionVictimFilter.filterPreemptionVictims(
+        TASK.getAssignedTask().getTask(),
+        slot.getVictims(),
+        EMPTY,
+        Optional.of(OFFER),
+        storeProvider)).andReturn(victims);
+  }
+
+  private void expectPreempted(IScheduledTask preempted) throws Exception {
+    expect(stateManager.changeState(
+        anyObject(Storage.MutableStoreProvider.class),
+        eq(Tasks.id(preempted)),
+        eq(Optional.absent()),
+        eq(ScheduleStatus.PREEMPTING),
+        EasyMock.anyObject()))
+        .andReturn(StateChangeResult.SUCCESS);
+  }
+
+  private static PreemptionProposal createPreemptionProposal(IScheduledTask task) {
+    IAssignedTask assigned = task.getAssignedTask();
+    return new PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID);
+  }
+
+  private static ScheduledTask makeTask() {
+    ScheduledTask task = new ScheduledTask()
+        .setAssignedTask(new AssignedTask()
+            .setTask(new TaskConfig()
+                .setPriority(1)
+                .setProduction(true)
+                .setJob(new JobKey("role", "env", "name"))));
+    task.addToTaskEvents(new TaskEvent(0, PENDING));
+    return task;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
new file mode 100644
index 0000000..ea76639
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.preemptor;
+
+import com.google.common.base.Optional;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.twitter.common.application.StartupStage;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.state.TaskAssigner;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class PreemptorModuleTest extends EasyMockTest {
+
+  private StorageTestUtil storageUtil;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+  }
+
+  private Injector createInjector(Module module) {
+    return Guice.createInjector(
+        module,
+        new LifecycleModule(),
+        new AbstractModule() {
+          private <T> void bindMock(Class<T> clazz) {
+            bind(clazz).toInstance(createMock(clazz));
+          }
+
+          @Override
+          protected void configure() {
+            bindMock(SchedulingFilter.class);
+            bindMock(StateManager.class);
+            bindMock(TaskAssigner.class);
+            bindMock(Thread.UncaughtExceptionHandler.class);
+            bind(Storage.class).toInstance(storageUtil.storage);
+          }
+        });
+  }
+
+  @Test
+  public void testPreemptorDisabled() throws Exception {
+    Injector injector = createInjector(new PreemptorModule(
+        false,
+        Amount.of(0L, Time.SECONDS),
+        Amount.of(0L, Time.SECONDS)));
+
+    control.replay();
+
+    injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
+
+    injector.getBindings();
+    assertEquals(
+        Optional.absent(),
+        injector.getInstance(Preemptor.class).attemptPreemptionFor(
+            IAssignedTask.build(new AssignedTask()),
+            AttributeAggregate.EMPTY,
+            storageUtil.mutableStoreProvider));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
new file mode 100644
index 0000000..814edef
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.pruning;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.scheduler.pruning.JobUpdateHistoryPruner.HistoryPrunerSettings;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+
+public class JobUpdateHistoryPrunerTest extends EasyMockTest {
+  @Test
+  public void testExecution() throws Exception {
+    StorageTestUtil storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+
+    final ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
+    FakeScheduledExecutor executorClock =
+        FakeScheduledExecutor.scheduleAtFixedRateExecutor(executor, 2);
+
+    Clock mockClock = createMock(Clock.class);
+    expect(mockClock.nowMillis()).andReturn(2L).times(2);
+
+    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1))
+        .andReturn(ImmutableSet.of(
+            IJobUpdateKey.build(
+                new JobUpdateKey().setJob(new JobKey("role", "env", "job")).setId("id1"))));
+    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)).andReturn(ImmutableSet.of());
+
+    control.replay();
+
+    executorClock.assertEmpty();
+    JobUpdateHistoryPruner pruner = new JobUpdateHistoryPruner(
+        mockClock,
+        executor,
+        storageUtil.storage,
+        new HistoryPrunerSettings(
+            Amount.of(1L, Time.MILLISECONDS),
+            Amount.of(1L, Time.MILLISECONDS),
+            1));
+
+    pruner.startAsync().awaitRunning();
+    executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
+    executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
new file mode 100644
index 0000000..461c4d0
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
@@ -0,0 +1,398 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.pruning;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.fail;
+
+public class TaskHistoryPrunerTest extends EasyMockTest {
+  private static final String JOB_A = "job-a";
+  private static final String TASK_ID = "task_id";
+  private static final String SLAVE_HOST = "HOST_A";
+  private static final Amount<Long, Time> ONE_MS = Amount.of(1L, Time.MILLISECONDS);
+  private static final Amount<Long, Time> ONE_MINUTE = Amount.of(1L, Time.MINUTES);
+  private static final Amount<Long, Time> ONE_DAY = Amount.of(1L, Time.DAYS);
+  private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS);
+  private static final int PER_JOB_HISTORY = 2;
+
+  private ScheduledFuture<?> future;
+  private ScheduledExecutorService executor;
+  private FakeClock clock;
+  private StateManager stateManager;
+  private StorageTestUtil storageUtil;
+  private TaskHistoryPruner pruner;
+
+  @Before
+  public void setUp() {
+    future = createMock(new Clazz<ScheduledFuture<?>>() { });
+    executor = createMock(ScheduledExecutorService.class);
+    clock = new FakeClock();
+    stateManager = createMock(StateManager.class);
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    pruner = new TaskHistoryPruner(
+        executor,
+        stateManager,
+        clock,
+        new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
+        storageUtil.storage);
+  }
+
+  @Test
+  public void testNoPruning() {
+    long taskATimestamp = clock.nowMillis();
+    IScheduledTask a = makeTask("a", FINISHED);
+
+    clock.advance(ONE_MS);
+    long taskBTimestamp = clock.nowMillis();
+    IScheduledTask b = makeTask("b", LOST);
+
+    expectNoImmediatePrune(ImmutableSet.of(a));
+    expectOneDelayedPrune(taskATimestamp);
+    expectNoImmediatePrune(ImmutableSet.of(a, b));
+    expectOneDelayedPrune(taskBTimestamp);
+
+    control.replay();
+
+    pruner.recordStateChange(TaskStateChange.initialized(a));
+    pruner.recordStateChange(TaskStateChange.initialized(b));
+  }
+
+  @Test
+  public void testStorageStartedWithPruning() {
+    long taskATimestamp = clock.nowMillis();
+    IScheduledTask a = makeTask("a", FINISHED);
+
+    clock.advance(ONE_MINUTE);
+    long taskBTimestamp = clock.nowMillis();
+    IScheduledTask b = makeTask("b", LOST);
+
+    clock.advance(ONE_MINUTE);
+    long taskCTimestamp = clock.nowMillis();
+    IScheduledTask c = makeTask("c", FINISHED);
+
+    clock.advance(ONE_MINUTE);
+    IScheduledTask d = makeTask("d", FINISHED);
+    IScheduledTask e = makeTask("job-x", "e", FINISHED);
+
+    expectNoImmediatePrune(ImmutableSet.of(a));
+    expectOneDelayedPrune(taskATimestamp);
+    expectNoImmediatePrune(ImmutableSet.of(a, b));
+    expectOneDelayedPrune(taskBTimestamp);
+    expectImmediatePrune(ImmutableSet.of(a, b, c), a);
+    expectOneDelayedPrune(taskCTimestamp);
+    expectImmediatePrune(ImmutableSet.of(b, c, d), b);
+    expectDefaultDelayedPrune();
+    expectNoImmediatePrune(ImmutableSet.of(e));
+    expectDefaultDelayedPrune();
+
+    control.replay();
+
+    for (IScheduledTask task : ImmutableList.of(a, b, c, d, e)) {
+      pruner.recordStateChange(TaskStateChange.initialized(task));
+    }
+  }
+
+  @Test
+  public void testStateChange() {
+    IScheduledTask starting = makeTask("a", STARTING);
+    IScheduledTask running = copy(starting, RUNNING);
+    IScheduledTask killed = copy(starting, KILLED);
+
+    expectNoImmediatePrune(ImmutableSet.of(killed));
+    expectDefaultDelayedPrune();
+
+    control.replay();
+
+    // No future set for non-terminal state transition.
+    changeState(starting, running);
+
+    // Future set for terminal state transition.
+    changeState(running, killed);
+  }
+
+  @Test
+  public void testActivateFutureAndExceedHistoryGoal() {
+    IScheduledTask running = makeTask("a", RUNNING);
+    IScheduledTask killed = copy(running, KILLED);
+    expectNoImmediatePrune(ImmutableSet.of(running));
+    Capture<Runnable> delayedDelete = expectDefaultDelayedPrune();
+
+    // Expect task "a" to be pruned when future is activated.
+    expectDeleteTasks("a");
+
+    control.replay();
+
+    // Capture future for inactive task "a"
+    changeState(running, killed);
+    clock.advance(ONE_HOUR);
+    // Execute future to prune task "a" from the system.
+    delayedDelete.getValue().run();
+  }
+
+  @Test
+  public void testJobHistoryExceeded() {
+    IScheduledTask a = makeTask("a", RUNNING);
+    clock.advance(ONE_MS);
+    IScheduledTask aKilled = copy(a, KILLED);
+
+    IScheduledTask b = makeTask("b", RUNNING);
+    clock.advance(ONE_MS);
+    IScheduledTask bKilled = copy(b, KILLED);
+
+    IScheduledTask c = makeTask("c", RUNNING);
+    clock.advance(ONE_MS);
+    IScheduledTask cLost = copy(c, LOST);
+
+    IScheduledTask d = makeTask("d", RUNNING);
+    clock.advance(ONE_MS);
+    IScheduledTask dLost = copy(d, LOST);
+
+    expectNoImmediatePrune(ImmutableSet.of(a));
+    expectDefaultDelayedPrune();
+    expectNoImmediatePrune(ImmutableSet.of(a, b));
+    expectDefaultDelayedPrune();
+    expectNoImmediatePrune(ImmutableSet.of(a, b)); // no pruning yet due to min threshold
+    expectDefaultDelayedPrune();
+    clock.advance(ONE_HOUR);
+    expectImmediatePrune(ImmutableSet.of(a, b, c, d), a, b); // now prune 2 tasks
+    expectDefaultDelayedPrune();
+
+    control.replay();
+
+    changeState(a, aKilled);
+    changeState(b, bKilled);
+    changeState(c, cLost);
+    changeState(d, dLost);
+  }
+
+  // TODO(William Farner): Consider removing the thread safety tests.  Now that intrinsic locks
+  // are not used, it is rather awkward to test this.
+  @Test
+  public void testThreadSafeStateChangeEvent() throws Exception {
+    // This tests against regression where an executor pruning a task holds an intrinsic lock and
+    // an unrelated task state change in the scheduler fires an event that requires this intrinsic
+    // lock. This causes a deadlock when the executor tries to acquire a lock held by the event
+    // fired.
+
+    pruner = prunerWithRealExecutor();
+    Command onDeleted = new Command() {
+      @Override
+      public void execute() {
+        // The goal is to verify that the call does not deadlock. We do not care about the outcome.
+        IScheduledTask b = makeTask("b", ASSIGNED);
+
+        changeState(b, STARTING);
+      }
+    };
+    CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
+
+    control.replay();
+
+    // Change the task to a terminal state and wait for it to be pruned.
+    changeState(makeTask(TASK_ID, RUNNING), KILLED);
+    taskDeleted.await();
+  }
+
+  private TaskHistoryPruner prunerWithRealExecutor() {
+    ScheduledExecutorService realExecutor = Executors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("testThreadSafeEvents-executor")
+            .build());
+    return new TaskHistoryPruner(
+        realExecutor,
+        stateManager,
+        clock,
+        new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY),
+        storageUtil.storage);
+  }
+
+  private CountDownLatch expectTaskDeleted(final Command onDelete, String taskId) {
+    final CountDownLatch deleteCalled = new CountDownLatch(1);
+    final CountDownLatch eventDelivered = new CountDownLatch(1);
+
+    Thread eventDispatch = new Thread() {
+      @Override
+      public void run() {
+        try {
+          deleteCalled.await();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          fail("Interrupted while awaiting for delete call.");
+          return;
+        }
+        onDelete.execute();
+        eventDelivered.countDown();
+      }
+    };
+    eventDispatch.setDaemon(true);
+    eventDispatch.setName(getClass().getName() + "-EventDispatch");
+    eventDispatch.start();
+
+    stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.of(taskId));
+    expectLastCall().andAnswer(new IAnswer<Void>() {
+      @Override
+      public Void answer() {
+        deleteCalled.countDown();
+        try {
+          eventDelivered.await();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          fail("Interrupted while awaiting for event delivery.");
+        }
+        return null;
+      }
+    });
+
+    return eventDelivered;
+  }
+
+  private void expectDeleteTasks(String... tasks) {
+    stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.copyOf(tasks));
+  }
+
+  private Capture<Runnable> expectDefaultDelayedPrune() {
+    return expectDelayedPrune(ONE_DAY.as(Time.MILLISECONDS), 1);
+  }
+
+  private Capture<Runnable> expectOneDelayedPrune(long timestampMillis) {
+    return expectDelayedPrune(timestampMillis, 1);
+  }
+
+  private void expectNoImmediatePrune(ImmutableSet<IScheduledTask> tasksInJob) {
+    expectImmediatePrune(tasksInJob);
+  }
+
+  private void expectImmediatePrune(
+      ImmutableSet<IScheduledTask> tasksInJob,
+      IScheduledTask... pruned) {
+
+    // Expect a deferred prune operation when a new task is being watched.
+    executor.submit(EasyMock.<Runnable>anyObject());
+    expectLastCall().andAnswer(
+        new IAnswer<Future<?>>() {
+          @Override
+          public Future<?> answer() {
+            Runnable work = (Runnable) EasyMock.getCurrentArguments()[0];
+            work.run();
+            return null;
+          }
+        }
+    );
+
+    IJobKey jobKey = Iterables.getOnlyElement(
+        FluentIterable.from(tasksInJob).transform(Tasks.SCHEDULED_TO_JOB_KEY).toSet());
+    storageUtil.expectTaskFetch(TaskHistoryPruner.jobHistoryQuery(jobKey), tasksInJob);
+    if (pruned.length > 0) {
+      stateManager.deleteTasks(storageUtil.mutableStoreProvider, Tasks.ids(pruned));
+    }
+  }
+
+  private Capture<Runnable> expectDelayedPrune(long timestampMillis, int count) {
+    Capture<Runnable> capture = createCapture();
+    executor.schedule(
+        EasyMock.capture(capture),
+        eq(pruner.calculateTimeout(timestampMillis)),
+        eq(TimeUnit.MILLISECONDS));
+    expectLastCall().andReturn(future).times(count);
+    return capture;
+  }
+
+  private void changeState(IScheduledTask oldStateTask, IScheduledTask newStateTask) {
+    pruner.recordStateChange(TaskStateChange.transition(newStateTask, oldStateTask.getStatus()));
+  }
+
+  private void changeState(IScheduledTask oldStateTask, ScheduleStatus status) {
+    pruner.recordStateChange(
+        TaskStateChange.transition(copy(oldStateTask, status), oldStateTask.getStatus()));
+  }
+
+  private IScheduledTask copy(IScheduledTask task, ScheduleStatus status) {
+    return IScheduledTask.build(task.newBuilder().setStatus(status));
+  }
+
+  private IScheduledTask makeTask(
+      String job,
+      String taskId,
+      ScheduleStatus status) {
+
+    return IScheduledTask.build(new ScheduledTask()
+        .setStatus(status)
+        .setTaskEvents(ImmutableList.of(new TaskEvent(clock.nowMillis(), status)))
+        .setAssignedTask(makeAssignedTask(job, taskId)));
+  }
+
+  private IScheduledTask makeTask(String taskId, ScheduleStatus status) {
+    return makeTask(JOB_A, taskId, status);
+  }
+
+  private AssignedTask makeAssignedTask(String job, String taskId) {
+    return new AssignedTask()
+        .setSlaveHost(SLAVE_HOST)
+        .setTaskId(taskId)
+        .setTask(new TaskConfig()
+            .setJob(new JobKey("role", "staging45", job))
+            .setOwner(new Identity().setRole("role").setUser("user"))
+            .setEnvironment("staging45")
+            .setJobName(job)
+            .setExecutorConfig(new ExecutorConfig("aurora", "config")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
new file mode 100644
index 0000000..26f65fa
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.reconciliation;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.inject.Singleton;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.testing.TearDown;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.BackoffStrategy;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.state.PubsubTestUtil;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class KillRetryTest extends EasyMockTest {
+
+  private Driver driver;
+  private StorageTestUtil storageUtil;
+  private BackoffStrategy backoffStrategy;
+  private FakeScheduledExecutor clock;
+  private EventBus eventBus;
+  private FakeStatsProvider statsProvider;
+
+  @Before
+  public void setUp() throws Exception {
+    driver = createMock(Driver.class);
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    backoffStrategy = createMock(BackoffStrategy.class);
+    final ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class);
+    clock = FakeScheduledExecutor.scheduleExecutor(executorMock);
+    addTearDown(new TearDown() {
+      @Override
+      public void tearDown() {
+        clock.assertEmpty();
+      }
+    });
+    statsProvider = new FakeStatsProvider();
+
+    Injector injector = Guice.createInjector(
+        new LifecycleModule(),
+        new PubsubEventModule(false),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(Driver.class).toInstance(driver);
+            bind(Storage.class).toInstance(storageUtil.storage);
+            bind(ScheduledExecutorService.class).annotatedWith(AsyncExecutor.class)
+                .toInstance(executorMock);
+            PubsubEventModule.bindSubscriber(binder(), KillRetry.class);
+            bind(KillRetry.class).in(Singleton.class);
+            bind(BackoffStrategy.class).toInstance(backoffStrategy);
+            bind(StatsProvider.class).toInstance(statsProvider);
+            bind(UncaughtExceptionHandler.class)
+                .toInstance(createMock(UncaughtExceptionHandler.class));
+          }
+        }
+    );
+    eventBus = injector.getInstance(EventBus.class);
+    PubsubTestUtil.startPubsub(injector);
+  }
+
+  private static IScheduledTask makeTask(String id, ScheduleStatus status) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setStatus(status)
+        .setAssignedTask(new AssignedTask().setTaskId(id)));
+  }
+
+  private void moveToKilling(String taskId) {
+    eventBus.post(TaskStateChange.transition(makeTask(taskId, KILLING), RUNNING));
+  }
+
+  private static Query.Builder killingQuery(String taskId) {
+    return Query.taskScoped(taskId).byStatus(KILLING);
+  }
+
+  private void expectGetRetryDelay(long prevRetryMs, long retryInMs) {
+    expect(backoffStrategy.calculateBackoffMs(prevRetryMs)).andReturn(retryInMs);
+  }
+
+  private void expectRetry(String taskId, long prevRetryMs, long nextRetryMs) {
+    storageUtil.expectTaskFetch(killingQuery(taskId), makeTask(taskId, KILLING));
+    driver.killTask(taskId);
+    expectGetRetryDelay(prevRetryMs, nextRetryMs);
+  }
+
+  @Test
+  public void testRetries() {
+    String taskId = "a";
+    expectGetRetryDelay(0, 100);
+    expectRetry(taskId, 100, 1000);
+    expectRetry(taskId, 1000, 10000);
+
+    // Signal that task has transitioned.
+    storageUtil.expectTaskFetch(killingQuery(taskId));
+
+    control.replay();
+
+    moveToKilling(taskId);
+    clock.advance(Amount.of(100L, Time.MILLISECONDS));
+    clock.advance(Amount.of(1000L, Time.MILLISECONDS));
+    clock.advance(Amount.of(10000L, Time.MILLISECONDS));
+    assertEquals(2L, statsProvider.getLongValue(KillRetry.RETRIES_COUNTER));
+  }
+
+  @Test
+  public void testDoesNotRetry() {
+    String taskId = "a";
+    expectGetRetryDelay(0, 100);
+
+    storageUtil.expectTaskFetch(killingQuery(taskId));
+
+    control.replay();
+
+    moveToKilling(taskId);
+    clock.advance(Amount.of(100L, Time.MILLISECONDS));
+    assertEquals(0L, statsProvider.getLongValue(KillRetry.RETRIES_COUNTER));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
new file mode 100644
index 0000000..b980a4e
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.reconciliation;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.twitter.common.quantity.Time.MINUTES;
+
+import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.EXPLICIT_STAT_NAME;
+import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.IMPLICIT_STAT_NAME;
+import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.TASK_TO_PROTO;
+import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.TaskReconcilerSettings;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+
+public class TaskReconcilerTest extends EasyMockTest {
+  private static final Amount<Long, Time> INITIAL_DELAY = Amount.of(10L, MINUTES);
+  private static final Amount<Long, Time> EXPLICIT_SCHEDULE = Amount.of(60L, MINUTES);
+  private static final Amount<Long, Time> IMPLICT_SCHEDULE = Amount.of(180L, MINUTES);
+  private static final Amount<Long, Time> SPREAD = Amount.of(30L, MINUTES);
+  private static final TaskReconcilerSettings SETTINGS = new TaskReconcilerSettings(
+      INITIAL_DELAY,
+      EXPLICIT_SCHEDULE,
+      IMPLICT_SCHEDULE,
+      SPREAD);
+
+  private StorageTestUtil storageUtil;
+  private StatsProvider statsProvider;
+  private Driver driver;
+  private ScheduledExecutorService executorService;
+  private FakeScheduledExecutor clock;
+  private AtomicLong explicitRuns;
+  private AtomicLong implicitRuns;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    statsProvider = createMock(StatsProvider.class);
+    driver = createMock(Driver.class);
+    executorService = createMock(ScheduledExecutorService.class);
+    explicitRuns = new AtomicLong();
+    implicitRuns = new AtomicLong();
+  }
+
+  @Test
+  public void testExecution() {
+    expect(statsProvider.makeCounter(EXPLICIT_STAT_NAME)).andReturn(explicitRuns);
+    expect(statsProvider.makeCounter(IMPLICIT_STAT_NAME)).andReturn(implicitRuns);
+    clock = FakeScheduledExecutor.scheduleAtFixedRateExecutor(executorService, 2, 5);
+
+    IScheduledTask task = TaskTestUtil.makeTask("id1", TaskTestUtil.JOB);
+    storageUtil.expectOperations();
+    storageUtil.expectTaskFetch(Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES), task)
+        .times(5);
+
+    driver.reconcileTasks(ImmutableSet.of(TASK_TO_PROTO.apply(task)));
+    expectLastCall().times(5);
+
+    driver.reconcileTasks(ImmutableSet.of());
+    expectLastCall().times(2);
+
+    control.replay();
+
+    TaskReconciler reconciler = new TaskReconciler(
+        SETTINGS,
+        storageUtil.storage,
+        driver,
+        executorService,
+        statsProvider);
+
+    reconciler.startAsync().awaitRunning();
+
+    clock.advance(INITIAL_DELAY);
+    assertEquals(1L, explicitRuns.get());
+    assertEquals(0L, implicitRuns.get());
+
+    clock.advance(SPREAD);
+    assertEquals(1L, explicitRuns.get());
+    assertEquals(1L, implicitRuns.get());
+
+    clock.advance(EXPLICIT_SCHEDULE);
+    assertEquals(2L, explicitRuns.get());
+    assertEquals(1L, implicitRuns.get());
+
+    clock.advance(IMPLICT_SCHEDULE);
+    assertEquals(5L, explicitRuns.get());
+    assertEquals(2L, implicitRuns.get());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidImplicitDelay() throws Exception {
+    control.replay();
+
+    new TaskReconcilerSettings(
+        INITIAL_DELAY,
+        EXPLICIT_SCHEDULE,
+        IMPLICT_SCHEDULE,
+        Amount.of(Long.MAX_VALUE, MINUTES));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidExplicitDelay() throws Exception {
+    control.replay();
+
+    new TaskReconcilerSettings(
+        Amount.of(Long.MAX_VALUE, MINUTES),
+        EXPLICIT_SCHEDULE,
+        IMPLICT_SCHEDULE,
+        SPREAD);
+  }
+}


Mime
View raw message