aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [7/8] aurora git commit: Break apart async package and AsyncModule into purpose-specific equivalents.
Date Wed, 22 Jul 2015 19:40:04 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java
deleted file mode 100644
index 68d2e77..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
-
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.mesos.Protos;
-
-import static java.util.Objects.requireNonNull;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.twitter.common.quantity.Time.MINUTES;
-
-/**
- * A task reconciler that periodically triggers Mesos (implicit) and Aurora (explicit) task
- * reconciliation to synchronize global task states. More on task reconciliation:
- * http://mesos.apache.org/documentation/latest/reconciliation.
- */
-public class TaskReconciler extends AbstractIdleService {
-
-  @VisibleForTesting
-  static final String EXPLICIT_STAT_NAME = "reconciliation_explicit_runs";
-
-  @VisibleForTesting
-  static final String IMPLICIT_STAT_NAME = "reconciliation_implicit_runs";
-
-  private final TaskReconcilerSettings settings;
-  private final Storage storage;
-  private final Driver driver;
-  private final ScheduledExecutorService executor;
-  private final AtomicLong explicitRuns;
-  private final AtomicLong implicitRuns;
-
-  static class TaskReconcilerSettings {
-    private final Amount<Long, Time> explicitInterval;
-    private final Amount<Long, Time> implicitInterval;
-    private final long explicitDelayMinutes;
-    private final long implicitDelayMinutes;
-
-    @VisibleForTesting
-    TaskReconcilerSettings(
-        Amount<Long, Time> initialDelay,
-        Amount<Long, Time> explicitInterval,
-        Amount<Long, Time> implicitInterval,
-        Amount<Long, Time> scheduleSpread) {
-
-      this.explicitInterval = requireNonNull(explicitInterval);
-      this.implicitInterval = requireNonNull(implicitInterval);
-      explicitDelayMinutes = requireNonNull(initialDelay).as(MINUTES);
-      implicitDelayMinutes = initialDelay.as(MINUTES) + scheduleSpread.as(MINUTES);
-      checkArgument(
-          explicitDelayMinutes >= 0,
-          "Invalid explicit reconciliation delay: " + explicitDelayMinutes);
-      checkArgument(
-          implicitDelayMinutes >= 0L,
-          "Invalid implicit reconciliation delay: " + implicitDelayMinutes);
-    }
-  }
-
-  @Inject
-  TaskReconciler(
-      TaskReconcilerSettings settings,
-      Storage storage,
-      Driver driver,
-      ScheduledExecutorService executor,
-      StatsProvider stats) {
-
-    this.settings = requireNonNull(settings);
-    this.storage = requireNonNull(storage);
-    this.driver = requireNonNull(driver);
-    this.executor = requireNonNull(executor);
-    this.explicitRuns = stats.makeCounter(EXPLICIT_STAT_NAME);
-    this.implicitRuns = stats.makeCounter(IMPLICIT_STAT_NAME);
-  }
-
-  @Override
-  protected void startUp() {
-    // Schedule explicit reconciliation.
-    executor.scheduleAtFixedRate(
-        new Runnable() {
-          @Override
-          public void run() {
-            ImmutableSet<Protos.TaskStatus> active = FluentIterable
-                .from(Storage.Util.fetchTasks(
-                    storage,
-                    Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES)))
-                .transform(TASK_TO_PROTO)
-                .toSet();
-
-            driver.reconcileTasks(active);
-            explicitRuns.incrementAndGet();
-          }
-        },
-        settings.explicitDelayMinutes,
-        settings.explicitInterval.as(MINUTES),
-        MINUTES.getTimeUnit());
-
-    // Schedule implicit reconciliation.
-    executor.scheduleAtFixedRate(
-        new Runnable() {
-          @Override
-          public void run() {
-            driver.reconcileTasks(ImmutableSet.of());
-            implicitRuns.incrementAndGet();
-          }
-        },
-        settings.implicitDelayMinutes,
-        settings.implicitInterval.as(MINUTES),
-        MINUTES.getTimeUnit());
-  }
-
-  @Override
-  protected void shutDown() {
-    // Nothing to do - await VM shutdown.
-  }
-
-  @VisibleForTesting
-  static final Function<IScheduledTask, Protos.TaskStatus> TASK_TO_PROTO =
-      t -> Protos.TaskStatus.newBuilder()
-          // TODO(maxim): State is required by protobuf but ignored by Mesos for reconciliation
-          // purposes. This is the artifact of the native API. The new HTTP Mesos API will be
-          // accepting task IDs instead. AURORA-1326 tracks solution on the scheduler side.
-          // Setting TASK_RUNNING as a safe dummy value here.
-          .setState(Protos.TaskState.TASK_RUNNING)
-          .setTaskId(Protos.TaskID.newBuilder().setValue(t.getAssignedTask().getTaskId()).build())
-          .build();
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
deleted file mode 100644
index a500e55..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import com.google.common.eventbus.Subscribe;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.stats.Stats;
-
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.preemptor.BiCache;
-import org.apache.aurora.scheduler.async.preemptor.Preemptor;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-
-/**
- * Enables scheduling and preemption of tasks.
- */
-public interface TaskScheduler extends EventSubscriber {
-
-  /**
-   * Attempts to schedule a task, possibly performing irreversible actions.
-   *
-   * @param taskId The task to attempt to schedule.
-   * @return {@code true} if the task was scheduled, {@code false} otherwise. The caller should
-   *         call schedule again if {@code false} is returned.
-   */
-  boolean schedule(String taskId);
-
-  /**
-   * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where each task
-   * backs off after a failed scheduling attempt.
-   * <p>
-   * Pending tasks are advertised to the scheduler via internal pubsub notifications.
-   */
-  class TaskSchedulerImpl implements TaskScheduler {
-    /**
-     * Binding annotation for the time duration of reservations.
-     */
-    @VisibleForTesting
-    @Qualifier
-    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-    public @interface ReservationDuration { }
-
-    private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName());
-
-    private final Storage storage;
-    private final StateManager stateManager;
-    private final TaskAssigner assigner;
-    private final OfferManager offerManager;
-    private final Preemptor preemptor;
-    private final BiCache<String, TaskGroupKey> reservations;
-
-    private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
-    private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed");
-    private final AtomicLong attemptsNoMatch = Stats.exportLong("schedule_attempts_no_match");
-
-    @Inject
-    TaskSchedulerImpl(
-        Storage storage,
-        StateManager stateManager,
-        TaskAssigner assigner,
-        OfferManager offerManager,
-        Preemptor preemptor,
-        BiCache<String, TaskGroupKey> reservations) {
-
-      this.storage = requireNonNull(storage);
-      this.stateManager = requireNonNull(stateManager);
-      this.assigner = requireNonNull(assigner);
-      this.offerManager = requireNonNull(offerManager);
-      this.preemptor = requireNonNull(preemptor);
-      this.reservations = requireNonNull(reservations);
-    }
-
-    private Function<HostOffer, Assignment> getAssignerFunction(
-        final MutableStoreProvider storeProvider,
-        final ResourceRequest resourceRequest,
-        final String taskId) {
-
-      // TODO(wfarner): Turn this into Predicate<Offer>, and in the caller, find the first match
-      // and perform the assignment at the very end.  This will allow us to use optimistic locking
-      // at the top of the stack and avoid holding the write lock for too long.
-      return new Function<HostOffer, Assignment>() {
-        @Override
-        public Assignment apply(HostOffer offer) {
-          Optional<TaskGroupKey> reservation =
-              reservations.get(offer.getOffer().getSlaveId().getValue());
-
-          if (reservation.isPresent()) {
-            if (TaskGroupKey.from(resourceRequest.getTask()).equals(reservation.get())) {
-              // Slave is reserved to satisfy this task group.
-              return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId);
-            } else {
-              // Slave is reserved for another task.
-              return Assignment.failure();
-            }
-          } else {
-            // Slave is not reserved.
-            return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId);
-          }
-        }
-      };
-    }
-
-    @VisibleForTesting
-    static final Optional<String> LAUNCH_FAILED_MSG =
-        Optional.of("Unknown exception attempting to schedule task.");
-
-    @Timed("task_schedule_attempt")
-    @Override
-    public boolean schedule(final String taskId) {
-      attemptsFired.incrementAndGet();
-      try {
-        return storage.write(new MutateWork.Quiet<Boolean>() {
-          @Override
-          public Boolean apply(MutableStoreProvider store) {
-            return scheduleTask(store, taskId);
-          }
-        });
-      } catch (RuntimeException e) {
-        // We catch the generic unchecked exception here to ensure tasks are not abandoned
-        // if there is a transient issue resulting in an unchecked exception.
-        LOG.log(Level.WARNING, "Task scheduling unexpectedly failed, will be retried", e);
-        attemptsFailed.incrementAndGet();
-        return false;
-      }
-    }
-
-    @Timed("task_schedule_attempt_locked")
-    protected boolean scheduleTask(MutableStoreProvider store, String taskId) {
-      LOG.fine("Attempting to schedule task " + taskId);
-      IAssignedTask assignedTask = Iterables.getOnlyElement(
-          Iterables.transform(
-              store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
-              Tasks.SCHEDULED_TO_ASSIGNED),
-          null);
-
-      if (assignedTask == null) {
-        LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
-      } else {
-        ITaskConfig task = assignedTask.getTask();
-        AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
-        try {
-          boolean launched = offerManager.launchFirst(
-              getAssignerFunction(store, new ResourceRequest(task, aggregate), taskId),
-              TaskGroupKey.from(task));
-
-          if (!launched) {
-            // Task could not be scheduled.
-            // TODO(maxim): Now that preemption slots are searched asynchronously, consider
-            // retrying a launch attempt within the current scheduling round IFF a reservation is
-            // available.
-            maybePreemptFor(assignedTask, aggregate, store);
-            attemptsNoMatch.incrementAndGet();
-            return false;
-          }
-        } catch (OfferManager.LaunchException e) {
-          LOG.log(Level.WARNING, "Failed to launch task.", e);
-          attemptsFailed.incrementAndGet();
-
-          // The attempt to schedule the task failed, so we need to backpedal on the
-          // assignment.
-          // It is in the LOST state and a new task will move to PENDING to replace it.
-          // Should the state change fail due to storage issues, that's okay.  The task will
-          // time out in the ASSIGNED state and be moved to LOST.
-          stateManager.changeState(
-              store,
-              taskId,
-              Optional.of(PENDING),
-              LOST,
-              LAUNCH_FAILED_MSG);
-        }
-      }
-
-      return true;
-    }
-
-    private void maybePreemptFor(
-        IAssignedTask task,
-        AttributeAggregate jobState,
-        MutableStoreProvider storeProvider) {
-
-      if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
-        return;
-      }
-      Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider);
-      if (slaveId.isPresent()) {
-        reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
-      }
-    }
-
-    @Subscribe
-    public void taskChanged(final TaskStateChange stateChangeEvent) {
-      if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
-        IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask();
-        if (assigned.getSlaveId() != null) {
-          reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask()));
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
deleted file mode 100644
index c8f2005..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Optional;
-import com.google.common.eventbus.Subscribe;
-import com.twitter.common.stats.SlidingStats;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-
-/**
- * A holding area for tasks that have been throttled.  Tasks entering the
- * {@link org.apache.aurora.gen.ScheduleStatus#THROTTLED} state will be transitioned to
- * {@link org.apache.aurora.gen.ScheduleStatus#PENDING} after the penalty period (as dictated by
- * {@link RescheduleCalculator} has expired.
- */
-class TaskThrottler implements EventSubscriber {
-
-  private final RescheduleCalculator rescheduleCalculator;
-  private final Clock clock;
-  private final ScheduledExecutorService executor;
-  private final Storage storage;
-  private final StateManager stateManager;
-
-  private final SlidingStats throttleStats = new SlidingStats("task_throttle", "ms");
-
-  @Inject
-  TaskThrottler(
-      RescheduleCalculator rescheduleCalculator,
-      Clock clock,
-      ScheduledExecutorService executor,
-      Storage storage,
-      StateManager stateManager) {
-
-    this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
-    this.clock = requireNonNull(clock);
-    this.executor = requireNonNull(executor);
-    this.storage = requireNonNull(storage);
-    this.stateManager = requireNonNull(stateManager);
-  }
-
-  @Subscribe
-  public void taskChangedState(final TaskStateChange stateChange) {
-    if (stateChange.getNewState() == THROTTLED) {
-      long readyAtMs = Tasks.getLatestEvent(stateChange.getTask()).getTimestamp()
-          + rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask());
-      long delayMs = Math.max(0, readyAtMs - clock.nowMillis());
-      throttleStats.accumulate(delayMs);
-      executor.schedule(
-          new Runnable() {
-            @Override
-            public void run() {
-              storage.write(new Storage.MutateWork.NoResult.Quiet() {
-                @Override
-                protected void execute(Storage.MutableStoreProvider storeProvider) {
-                  stateManager.changeState(
-                      storeProvider,
-                      stateChange.getTaskId(),
-                      Optional.of(THROTTLED),
-                      PENDING,
-                      Optional.absent());
-                }
-              });
-            }
-          },
-          delayMs,
-          TimeUnit.MILLISECONDS);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
deleted file mode 100644
index e250f33..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.EnumSet;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
-
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.state.StateChangeResult;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
-
-/**
- * Observes task transitions and identifies tasks that are 'stuck' in a transient state.  Stuck
- * tasks will be transitioned to the LOST state.
- */
-class TaskTimeout extends AbstractIdleService implements EventSubscriber {
-  private static final Logger LOG = Logger.getLogger(TaskTimeout.class.getName());
-
-  @VisibleForTesting
-  static final Amount<Long, Time> NOT_STARTED_RETRY = Amount.of(5L, Time.SECONDS);
-
-  @VisibleForTesting
-  static final String TIMED_OUT_TASKS_COUNTER = "timed_out_tasks";
-
-  @VisibleForTesting
-  static final Optional<String> TIMEOUT_MESSAGE = Optional.of("Task timed out");
-
-  @VisibleForTesting
-  static final Set<ScheduleStatus> TRANSIENT_STATES = EnumSet.of(
-      ScheduleStatus.ASSIGNED,
-      ScheduleStatus.PREEMPTING,
-      ScheduleStatus.RESTARTING,
-      ScheduleStatus.KILLING,
-      ScheduleStatus.DRAINING);
-
-  private final ScheduledExecutorService executor;
-  private final Storage storage;
-  private final StateManager stateManager;
-  private final Amount<Long, Time> timeout;
-  private final AtomicLong timedOutTasks;
-
-  @Inject
-  TaskTimeout(
-      ScheduledExecutorService executor,
-      Storage storage,
-      StateManager stateManager,
-      Amount<Long, Time> timeout,
-      StatsProvider statsProvider) {
-
-    this.executor = requireNonNull(executor);
-    this.storage = requireNonNull(storage);
-    this.stateManager = requireNonNull(stateManager);
-    this.timeout = requireNonNull(timeout);
-    this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER);
-  }
-
-  private static boolean isTransient(ScheduleStatus status) {
-    return TRANSIENT_STATES.contains(status);
-  }
-
-  @Override
-  protected void startUp() {
-    // No work to do here for startup, however we leverage the state tracking in
-    // AbstractIdleService.
-  }
-
-  @Override
-  protected void shutDown() {
-    // Nothing to do for shutting down.
-  }
-
-  private class TimedOutTaskHandler implements Runnable {
-    private final String taskId;
-    private final ScheduleStatus newState;
-
-    TimedOutTaskHandler(String taskId, ScheduleStatus newState) {
-      this.taskId = taskId;
-      this.newState = newState;
-    }
-
-    @Override
-    public void run() {
-      if (isRunning()) {
-        // This query acts as a CAS by including the state that we expect the task to be in
-        // if the timeout is still valid.  Ideally, the future would have already been
-        // canceled, but in the event of a state transition race, including transientState
-        // prevents an unintended task timeout.
-        // Note: This requires LOST transitions trigger Driver.killTask.
-        StateChangeResult result = storage.write(new MutateWork.Quiet<StateChangeResult>() {
-          @Override
-          public StateChangeResult apply(Storage.MutableStoreProvider storeProvider) {
-            return stateManager.changeState(
-                storeProvider,
-                taskId,
-                Optional.of(newState),
-                ScheduleStatus.LOST,
-                TIMEOUT_MESSAGE);
-          }
-        });
-
-        if (result == StateChangeResult.SUCCESS) {
-          LOG.info("Timeout reached for task " + taskId + ":" + taskId);
-          timedOutTasks.incrementAndGet();
-        }
-      } else {
-        // Our service is not yet started.  We don't want to lose track of the task, so
-        // we will try again later.
-        LOG.fine("Retrying timeout of task " + taskId + " in " + NOT_STARTED_RETRY);
-        executor.schedule(
-            this,
-            NOT_STARTED_RETRY.getValue(),
-            NOT_STARTED_RETRY.getUnit().getTimeUnit());
-      }
-    }
-  }
-
-  @Subscribe
-  public void recordStateChange(TaskStateChange change) {
-    if (isTransient(change.getNewState())) {
-      executor.schedule(
-          new TimedOutTaskHandler(change.getTaskId(), change.getNewState()),
-          timeout.getValue(),
-          timeout.getUnit().getTimeUnit());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java
deleted file mode 100644
index 382099f..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Ticker;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Clock;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A bi-directional cache of items. Entries are purged from cache after
- * {@link BiCacheSettings#expireAfter}.
- *
- * @param <K> Key type.
- * @param <V> Value type.
- */
-public class BiCache<K, V> {
-
-  public static class BiCacheSettings {
-    private final Amount<Long, Time> expireAfter;
-    private final String cacheSizeStatName;
-
-    public BiCacheSettings(Amount<Long, Time> expireAfter, String cacheSizeStatName) {
-      this.expireAfter = requireNonNull(expireAfter);
-      this.cacheSizeStatName = requireNonNull(cacheSizeStatName);
-    }
-  }
-
-  private final Cache<K, V> cache;
-  private final Multimap<V, K> inverse = HashMultimap.create();
-
-  @Inject
-  public BiCache(
-      StatsProvider statsProvider,
-      BiCacheSettings settings,
-      final Clock clock) {
-
-    requireNonNull(clock);
-    this.cache = CacheBuilder.newBuilder()
-        .expireAfterWrite(settings.expireAfter.as(Time.MINUTES), TimeUnit.MINUTES)
-        .ticker(new Ticker() {
-          @Override
-          public long read() {
-            return clock.nowNanos();
-          }
-        })
-        .removalListener(new RemovalListener<K, V>() {
-          @Override
-          public void onRemoval(RemovalNotification<K, V> notification) {
-            inverse.remove(notification.getValue(), notification.getKey());
-          }
-        })
-        .build();
-
-    statsProvider.makeGauge(
-        settings.cacheSizeStatName,
-        new Supplier<Long>() {
-          @Override
-          public Long get() {
-            return cache.size();
-          }
-        });
-  }
-
-  /**
-   * Puts a new key/value pair.
-   *
-   * @param key Key to add.
-   * @param value Value to add.
-   */
-  public synchronized void put(K key, V value) {
-    requireNonNull(key);
-    requireNonNull(value);
-    cache.put(key, value);
-    inverse.put(value, key);
-  }
-
-  /**
-   * Gets a cached value by key.
-   *
-   * @param key Key to get value for.
-   * @return Optional of value.
-   */
-  public synchronized Optional<V> get(K key) {
-    return Optional.fromNullable(cache.getIfPresent(key));
-  }
-
-  /**
-   * Gets a set of keys for a given value.
-   *
-   * @param value Value to get all keys for.
-   * @return An {@link Iterable} of keys or empty if value does not exist.
-   */
-  public synchronized Set<K> getByValue(V value) {
-    // Cache items are lazily removed by routine maintenance operations during get/write access.
-    // Forcing cleanup here to ensure proper data integrity.
-    cache.cleanUp();
-    return ImmutableSet.copyOf(inverse.get(value));
-  }
-
-  /**
-   * Removes a key/value pair from cache.
-   *
-   * @param key Key to remove.
-   * @param value Value to remove.
-   */
-  public synchronized void remove(K key, V value) {
-    inverse.remove(value, key);
-    cache.invalidate(key);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
deleted file mode 100644
index 38610b2..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Multimap;
-
-/**
- * A facade for the preemptor to gain access to the state of scheduled tasks in the cluster.
- */
-@VisibleForTesting
-public interface ClusterState {
-
-  /**
-   * Gets a snapshot of the active tasks in the cluster, indexed by the slave IDs they are
-   * assigned to.
-   * <p>
-   * TODO(wfarner): Return a more minimal type than IAssignedTask here.
-   *
-   * @return Active tasks and their associated slave IDs.
-   */
-  Multimap<String, PreemptionVictim> getSlavesToActiveTasks();
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImpl.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImpl.java
deleted file mode 100644
index d7a0c54..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImpl.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.eventbus.Subscribe;
-
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-
-/**
- * A cached view of cluster state, kept up to date by pubsub notifications.
- */
-public class ClusterStateImpl implements ClusterState, PubsubEvent.EventSubscriber {
-
-  private final Multimap<String, PreemptionVictim> victims =
-      Multimaps.synchronizedMultimap(HashMultimap.create());
-
-  @Override
-  public Multimap<String, PreemptionVictim> getSlavesToActiveTasks() {
-    return Multimaps.unmodifiableMultimap(victims);
-  }
-
-  @Subscribe
-  public void taskChangedState(TaskStateChange stateChange) {
-    synchronized (victims) {
-      String slaveId = stateChange.getTask().getAssignedTask().getSlaveId();
-      PreemptionVictim victim = PreemptionVictim.fromTask(stateChange.getTask().getAssignedTask());
-      if (Tasks.SLAVE_ASSIGNED_STATES.contains(stateChange.getNewState())) {
-        victims.put(slaveId, victim);
-      } else {
-        victims.remove(slaveId, victim);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
deleted file mode 100644
index 1f1eb4c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multiset;
-import com.google.common.collect.Sets;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.OfferManager;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
-
-/**
- * Attempts to find preemption slots for all PENDING tasks eligible for preemption.
- */
-@VisibleForTesting
-public class PendingTaskProcessor implements Runnable {
-  private final Storage storage;
-  private final OfferManager offerManager;
-  private final PreemptionVictimFilter preemptionVictimFilter;
-  private final PreemptorMetrics metrics;
-  private final Amount<Long, Time> preemptionCandidacyDelay;
-  private final BiCache<PreemptionProposal, TaskGroupKey> slotCache;
-  private final ClusterState clusterState;
-  private final Clock clock;
-
-  /**
-   * Binding annotation for the time interval after which a pending task becomes eligible to
-   * preempt other tasks. To avoid excessive churn, the preemptor requires that a task is PENDING
-   * for a duration (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible
-   * to preempt other tasks.
-   */
-  @VisibleForTesting
-  @Qualifier
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  public @interface PreemptionDelay { }
-
-  @Inject
-  PendingTaskProcessor(
-      Storage storage,
-      OfferManager offerManager,
-      PreemptionVictimFilter preemptionVictimFilter,
-      PreemptorMetrics metrics,
-      @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
-      BiCache<PreemptionProposal, TaskGroupKey> slotCache,
-      ClusterState clusterState,
-      Clock clock) {
-
-    this.storage = requireNonNull(storage);
-    this.offerManager = requireNonNull(offerManager);
-    this.preemptionVictimFilter = requireNonNull(preemptionVictimFilter);
-    this.metrics = requireNonNull(metrics);
-    this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
-    this.slotCache = requireNonNull(slotCache);
-    this.clusterState = requireNonNull(clusterState);
-    this.clock = requireNonNull(clock);
-  }
-
-  @Override
-  public void run() {
-    metrics.recordTaskProcessorRun();
-    storage.read(new Storage.Work.Quiet<Void>() {
-      @Override
-      public Void apply(StoreProvider store) {
-        Multimap<String, PreemptionVictim> slavesToActiveTasks =
-            clusterState.getSlavesToActiveTasks();
-
-        if (slavesToActiveTasks.isEmpty()) {
-          // No preemption victims to consider.
-          return null;
-        }
-
-        // Group the offers by slave id so they can be paired with active tasks from the same slave.
-        Map<String, HostOffer> slavesToOffers =
-            Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID);
-
-        Set<String> allSlaves = Sets.newHashSet(Iterables.concat(
-            slavesToOffers.keySet(),
-            slavesToActiveTasks.keySet()));
-
-        // The algorithm below attempts to find a reservation for every task group by matching
-        // it against all available slaves until a preemption slot is found. Groups are evaluated
-        // in a round-robin fashion to ensure fairness (e.g.: G1, G2, G3, G1, G2).
-        // A slave is removed from further matching once a reservation is made. Similarly, all
-        // identical task group instances are removed from further iteration if none of the
-        // available slaves could yield a preemption proposal. A consuming iterator is used for
-        // task groups to ensure iteration order is preserved after a task group is removed.
-        LoadingCache<IJobKey, AttributeAggregate> jobStates = attributeCache(store);
-        List<TaskGroupKey> pendingGroups = fetchIdlePendingGroups(store);
-        Iterator<TaskGroupKey> groups = Iterators.consumingIterator(pendingGroups.iterator());
-        while (!pendingGroups.isEmpty()) {
-          boolean matched = false;
-          TaskGroupKey group = groups.next();
-          ITaskConfig task = group.getTask();
-
-          metrics.recordPreemptionAttemptFor(task);
-          Iterator<String> slaveIterator = allSlaves.iterator();
-          while (slaveIterator.hasNext()) {
-            String slaveId = slaveIterator.next();
-            Optional<ImmutableSet<PreemptionVictim>> candidates =
-                preemptionVictimFilter.filterPreemptionVictims(
-                    task,
-                    slavesToActiveTasks.get(slaveId),
-                    jobStates.getUnchecked(task.getJob()),
-                    Optional.fromNullable(slavesToOffers.get(slaveId)),
-                    store);
-
-            metrics.recordSlotSearchResult(candidates, task);
-            if (candidates.isPresent()) {
-              // Slot found -> remove slave to avoid multiple task reservations.
-              slaveIterator.remove();
-              slotCache.put(new PreemptionProposal(candidates.get(), slaveId), group);
-              matched = true;
-              break;
-            }
-          }
-          if (!matched) {
-            // No slot found for the group -> remove group and reset group iterator.
-            pendingGroups.removeAll(ImmutableSet.of(group));
-            groups = Iterators.consumingIterator(pendingGroups.iterator());
-          }
-        }
-        return null;
-      }
-    });
-  }
-
-  private List<TaskGroupKey> fetchIdlePendingGroups(StoreProvider store) {
-    Multiset<TaskGroupKey> taskGroupCounts = HashMultiset.create(
-        FluentIterable.from(store.getTaskStore().fetchTasks(Query.statusScoped(PENDING)))
-            .filter(Predicates.and(isIdleTask, Predicates.not(hasCachedSlot)))
-            .transform(Functions.compose(ASSIGNED_TO_GROUP_KEY, SCHEDULED_TO_ASSIGNED)));
-
-    return getPreemptionSequence(taskGroupCounts);
-  }
-
-  /**
-   * Creates execution sequence for pending task groups by interleaving their unique occurrences.
-   * For example: {G1, G1, G1, G2, G2} will be converted into {G1, G2, G1, G2, G1}.
-   *
-   * @param groups Multiset of task groups.
-   * @return A task group execution sequence.
-   */
-  private static List<TaskGroupKey> getPreemptionSequence(Multiset<TaskGroupKey> groups) {
-    Multiset<TaskGroupKey> mutableGroups = HashMultiset.create(groups);
-    List<TaskGroupKey> instructions = Lists.newLinkedList();
-    Set<TaskGroupKey> keys = ImmutableSet.copyOf(groups.elementSet());
-    while (!mutableGroups.isEmpty()) {
-      for (TaskGroupKey key : keys) {
-        if (mutableGroups.contains(key)) {
-          instructions.add(key);
-          mutableGroups.remove(key);
-        }
-      }
-    }
-
-    return instructions;
-  }
-
-  private LoadingCache<IJobKey, AttributeAggregate> attributeCache(final StoreProvider store) {
-    return CacheBuilder.newBuilder().build(CacheLoader.from(
-        new Function<IJobKey, AttributeAggregate>() {
-          @Override
-          public AttributeAggregate apply(IJobKey job) {
-            return AttributeAggregate.getJobActiveState(store, job);
-          }
-        }));
-  }
-
-  private static final Function<IAssignedTask, TaskGroupKey> ASSIGNED_TO_GROUP_KEY =
-      new Function<IAssignedTask, TaskGroupKey>() {
-        @Override
-        public TaskGroupKey apply(IAssignedTask task) {
-          return TaskGroupKey.from(task.getTask());
-        }
-      };
-
-  private final Predicate<IScheduledTask> hasCachedSlot = new Predicate<IScheduledTask>() {
-    @Override
-    public boolean apply(IScheduledTask task) {
-      return !slotCache.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())).isEmpty();
-    }
-  };
-
-  private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() {
-    @Override
-    public boolean apply(IScheduledTask task) {
-      return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp())
-          >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
-    }
-  };
-
-  private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
-      new Function<HostOffer, String>() {
-        @Override
-        public String apply(HostOffer offer) {
-          return offer.getOffer().getSlaveId().getValue();
-        }
-      };
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java
deleted file mode 100644
index 7a03168..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.util.Objects;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A set of tasks proposed for preemption on a given slave.
- */
-class PreemptionProposal {
-  private final Set<PreemptionVictim> victims;
-  private final String slaveId;
-
-  PreemptionProposal(ImmutableSet<PreemptionVictim> victims, String slaveId) {
-    this.victims = requireNonNull(victims);
-    this.slaveId = requireNonNull(slaveId);
-  }
-
-  Set<PreemptionVictim> getVictims() {
-    return victims;
-  }
-
-  String getSlaveId() {
-    return slaveId;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (!(o instanceof PreemptionProposal)) {
-      return false;
-    }
-
-    PreemptionProposal other = (PreemptionProposal) o;
-    return Objects.equals(getVictims(), other.getVictims())
-        && Objects.equals(getSlaveId(), other.getSlaveId());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(victims, slaveId);
-  }
-
-  @Override
-  public String toString() {
-    return com.google.common.base.Objects.toStringHelper(this)
-        .add("victims", getVictims())
-        .add("slaveId", getSlaveId())
-        .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java
deleted file mode 100644
index f196b21..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.util.Objects;
-
-import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-/**
- * A victim to be considered as a candidate for preemption.
- */
-public final class PreemptionVictim {
-  private final String slaveHost;
-  private final boolean production;
-  private final String role;
-  private final int priority;
-  private final Resources resources;
-  private final String taskId;
-
-  private PreemptionVictim(
-      String slaveHost,
-      boolean production,
-      String role,
-      int priority,
-      Resources resources,
-      String taskId) {
-
-    this.slaveHost = slaveHost;
-    this.production = production;
-    this.role = role;
-    this.priority = priority;
-    this.resources = resources;
-    this.taskId = taskId;
-  }
-
-  public static PreemptionVictim fromTask(IAssignedTask task) {
-    ITaskConfig config = task.getTask();
-    return new PreemptionVictim(
-        task.getSlaveHost(),
-        config.isProduction(),
-        config.getJob().getRole(),
-        config.getPriority(),
-        Resources.from(task.getTask()),
-        task.getTaskId());
-  }
-
-  public String getSlaveHost() {
-    return slaveHost;
-  }
-
-  public boolean isProduction() {
-    return production;
-  }
-
-  public String getRole() {
-    return role;
-  }
-
-  public int getPriority() {
-    return priority;
-  }
-
-  public Resources getResources() {
-    return resources;
-  }
-
-  public String getTaskId() {
-    return taskId;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (!(o instanceof PreemptionVictim)) {
-      return false;
-    }
-
-    PreemptionVictim other = (PreemptionVictim) o;
-    return Objects.equals(getSlaveHost(), other.getSlaveHost())
-        && Objects.equals(isProduction(), other.isProduction())
-        && Objects.equals(getRole(), other.getRole())
-        && Objects.equals(getPriority(), other.getPriority())
-        && Objects.equals(getResources(), other.getResources())
-        && Objects.equals(getTaskId(), other.getTaskId());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(slaveHost, production, role, priority, resources, taskId);
-  }
-
-  @Override
-  public String toString() {
-    return com.google.common.base.Objects.toStringHelper(this)
-        .add("slaveHost", getSlaveHost())
-        .add("production", isProduction())
-        .add("role", getRole())
-        .add("priority", getPriority())
-        .add("resources", getResources())
-        .add("taskId", getTaskId())
-        .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java
deleted file mode 100644
index 75e2370..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.util.Set;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
-
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.mesos.ExecutorSettings;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Filters active tasks (victims) and available offer (slack) resources that can accommodate a
- * given task (candidate), provided victims are preempted.
- * <p>
- * A task may preempt another task if the following conditions hold true:
- * <ol>
- *  <li>The resources reserved for a victim (or a set of victims) are sufficient to satisfy
- *    the candidate.
- *  </li>
- *  <li>Both candidate and victim are owned by the same user and the
- *    {@link ITaskConfig#getPriority} of a victim is lower OR a victim is non-production and the
- *    candidate is production.
- *  </li>
- * </ol>
- */
-public interface PreemptionVictimFilter {
-  /**
-   * Returns a set of {@link PreemptionVictim} that can accommodate a given task if preempted.
-   *
-   * @param pendingTask Task to search preemption slot for.
-   * @param victims Active tasks on a slave.
-   * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job.
-   * @param offer A resource offer for a slave.
-   * @param storeProvider A store provider to access task data.
-   * @return A set of {@code PreemptionVictim} instances to preempt for a given task.
-   */
-  Optional<ImmutableSet<PreemptionVictim>> filterPreemptionVictims(
-      ITaskConfig pendingTask,
-      Iterable<PreemptionVictim> victims,
-      AttributeAggregate attributeAggregate,
-      Optional<HostOffer> offer,
-      StoreProvider storeProvider);
-
-  class PreemptionVictimFilterImpl implements PreemptionVictimFilter {
-    private final SchedulingFilter schedulingFilter;
-    private final ExecutorSettings executorSettings;
-    private final PreemptorMetrics metrics;
-
-    @Inject
-    PreemptionVictimFilterImpl(
-        SchedulingFilter schedulingFilter,
-        ExecutorSettings executorSettings,
-        PreemptorMetrics metrics) {
-
-      this.schedulingFilter = requireNonNull(schedulingFilter);
-      this.executorSettings = requireNonNull(executorSettings);
-      this.metrics = requireNonNull(metrics);
-    }
-
-    private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
-        new Function<HostOffer, ResourceSlot>() {
-          @Override
-          public ResourceSlot apply(HostOffer offer) {
-            return ResourceSlot.from(offer.getOffer());
-          }
-        };
-
-    private static final Function<HostOffer, String> OFFER_TO_HOST =
-        new Function<HostOffer, String>() {
-          @Override
-          public String apply(HostOffer offer) {
-            return offer.getOffer().getHostname();
-          }
-        };
-
-    private static final Function<PreemptionVictim, String> VICTIM_TO_HOST =
-        new Function<PreemptionVictim, String>() {
-          @Override
-          public String apply(PreemptionVictim victim) {
-            return victim.getSlaveHost();
-          }
-        };
-
-    private final Function<PreemptionVictim, ResourceSlot> victimToResources =
-        new Function<PreemptionVictim, ResourceSlot>() {
-          @Override
-          public ResourceSlot apply(PreemptionVictim victim) {
-            return ResourceSlot.from(victim, executorSettings);
-          }
-        };
-
-    // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
-    // ordering
-    private final Ordering<PreemptionVictim> resourceOrder =
-        ResourceSlot.ORDER.onResultOf(victimToResources).reverse();
-
-    @Override
-    public Optional<ImmutableSet<PreemptionVictim>> filterPreemptionVictims(
-        ITaskConfig pendingTask,
-        Iterable<PreemptionVictim> possibleVictims,
-        AttributeAggregate jobState,
-        Optional<HostOffer> offer,
-        StoreProvider storeProvider) {
-
-      // This enforces the precondition that all of the resources are from the same host. We need to
-      // get the host for the schedulingFilter.
-      Set<String> hosts = ImmutableSet.<String>builder()
-          .addAll(Iterables.transform(possibleVictims, VICTIM_TO_HOST))
-          .addAll(Iterables.transform(offer.asSet(), OFFER_TO_HOST)).build();
-
-      ResourceSlot slackResources =
-          ResourceSlot.sum(Iterables.transform(offer.asSet(), OFFER_TO_RESOURCE_SLOT));
-
-      FluentIterable<PreemptionVictim> preemptableTasks = FluentIterable.from(possibleVictims)
-          .filter(preemptionFilter(pendingTask));
-
-      if (preemptableTasks.isEmpty()) {
-        return Optional.absent();
-      }
-
-      Set<PreemptionVictim> toPreemptTasks = Sets.newHashSet();
-
-      Iterable<PreemptionVictim> sortedVictims =
-          resourceOrder.immutableSortedCopy(preemptableTasks);
-
-      Optional<IHostAttributes> attributes =
-          storeProvider.getAttributeStore().getHostAttributes(Iterables.getOnlyElement(hosts));
-
-      if (!attributes.isPresent()) {
-        metrics.recordMissingAttributes();
-        return Optional.absent();
-      }
-
-      for (PreemptionVictim victim : sortedVictims) {
-        toPreemptTasks.add(victim);
-
-        ResourceSlot totalResource = ResourceSlot.sum(
-            ResourceSlot.sum(Iterables.transform(toPreemptTasks, victimToResources)),
-            slackResources);
-
-        Set<Veto> vetoes = schedulingFilter.filter(
-            new UnusedResource(totalResource, attributes.get()),
-            new ResourceRequest(pendingTask, jobState));
-
-        if (vetoes.isEmpty()) {
-          return Optional.of(ImmutableSet.copyOf(toPreemptTasks));
-        }
-      }
-      return Optional.absent();
-    }
-
-    /**
-     * Creates a filter that will find tasks that the provided {@code pendingTask} may preempt.
-     *
-     * @param pendingTask A task that is not scheduled to possibly preempt other tasks for.
-     * @return A filter that will compare the priorities and resources required by other tasks
-     *     with {@code preemptableTask}.
-     */
-    private static Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig pendingTask) {
-      return new Predicate<PreemptionVictim>() {
-        @Override
-        public boolean apply(PreemptionVictim possibleVictim) {
-          boolean pendingIsProduction = pendingTask.isProduction();
-          boolean victimIsProduction = possibleVictim.isProduction();
-
-          if (pendingIsProduction && !victimIsProduction) {
-            return true;
-          } else if (pendingIsProduction == victimIsProduction) {
-            // If production flags are equal, preemption is based on priority within the same role.
-            if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) {
-              return pendingTask.getPriority() > possibleVictim.getPriority();
-            } else {
-              return false;
-            }
-          } else {
-            return false;
-          }
-        }
-      };
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
deleted file mode 100644
index a2d5fcf..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.util.Set;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.scheduler.async.OfferManager;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.mesos.Protos.SlaveID;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
-
-/**
- * Attempts to preempt active tasks in favor of the provided PENDING task in case a preemption
- * slot has been previously found.
- */
-public interface Preemptor {
-  /**
-   * Preempts victim tasks in case a valid preemption slot exists.
-   *
-   * @param task Preempting task.
-   * @param jobState Current job state aggregate.
-   * @param storeProvider Store provider to use for task preemption.
-   * @return ID of the slave where preemption occurred.
-   */
-  Optional<String> attemptPreemptionFor(
-      IAssignedTask task,
-      AttributeAggregate jobState,
-      MutableStoreProvider storeProvider);
-
-  class PreemptorImpl implements Preemptor {
-    private final StateManager stateManager;
-    private final OfferManager offerManager;
-    private final PreemptionVictimFilter preemptionVictimFilter;
-    private final PreemptorMetrics metrics;
-    private final BiCache<PreemptionProposal, TaskGroupKey> slotCache;
-
-    @Inject
-    PreemptorImpl(
-        StateManager stateManager,
-        OfferManager offerManager,
-        PreemptionVictimFilter preemptionVictimFilter,
-        PreemptorMetrics metrics,
-        BiCache<PreemptionProposal, TaskGroupKey> slotCache) {
-
-      this.stateManager = requireNonNull(stateManager);
-      this.offerManager = requireNonNull(offerManager);
-      this.preemptionVictimFilter = requireNonNull(preemptionVictimFilter);
-      this.metrics = requireNonNull(metrics);
-      this.slotCache = requireNonNull(slotCache);
-    }
-
-    @Override
-    public Optional<String> attemptPreemptionFor(
-        IAssignedTask pendingTask,
-        AttributeAggregate jobState,
-        MutableStoreProvider store) {
-
-      TaskGroupKey groupKey = TaskGroupKey.from(pendingTask.getTask());
-      Set<PreemptionProposal> preemptionProposals = slotCache.getByValue(groupKey);
-
-      // A preemption slot is available -> attempt to preempt tasks.
-      if (!preemptionProposals.isEmpty()) {
-        // Get the next available preemption slot.
-        PreemptionProposal slot = preemptionProposals.iterator().next();
-        slotCache.remove(slot, groupKey);
-
-        // Validate PreemptionProposal is still valid for the given task.
-        SlaveID slaveId = SlaveID.newBuilder().setValue(slot.getSlaveId()).build();
-        Optional<ImmutableSet<PreemptionVictim>> validatedVictims =
-            preemptionVictimFilter.filterPreemptionVictims(
-                pendingTask.getTask(),
-                slot.getVictims(),
-                jobState,
-                offerManager.getOffer(slaveId),
-                store);
-
-        metrics.recordSlotValidationResult(validatedVictims);
-        if (!validatedVictims.isPresent()) {
-          // Previously found victims are no longer valid -> let the next run find a new slot.
-          return Optional.absent();
-        }
-
-        for (PreemptionVictim toPreempt : validatedVictims.get()) {
-          metrics.recordTaskPreemption(toPreempt);
-          stateManager.changeState(
-              store,
-              toPreempt.getTaskId(),
-              Optional.absent(),
-              PREEMPTING,
-              Optional.of("Preempting in favor of " + pendingTask.getTaskId()));
-        }
-        return Optional.of(slot.getSlaveId());
-      }
-
-      return Optional.absent();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
deleted file mode 100644
index 22a1533..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.util.Set;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Defines methods to manage preemptor metrics.
- */
-@VisibleForTesting
-public class PreemptorMetrics {
-  @VisibleForTesting
-  static final String MISSING_ATTRIBUTES_NAME = "preemptor_missing_attributes";
-
-  @VisibleForTesting
-  static final String TASK_PROCESSOR_RUN_NAME = "preemptor_task_processor_runs";
-
-  private volatile boolean exported = false;
-  private final CachedCounters counters;
-
-  @Inject
-  PreemptorMetrics(CachedCounters counters) {
-    this.counters = requireNonNull(counters);
-    assertFullyExported();
-  }
-
-  private static String prod(boolean production) {
-    return production ? "prod" : "non_prod";
-  }
-
-  private static String result(boolean success) {
-    return success ? "successful" : "failed";
-  }
-
-  private void assertFullyExported() {
-    if (exported) {
-      return;
-    }
-
-    // Dummy-read all stats to ensure they are exported.
-    Set<String> allStats = ImmutableSet.of(
-        attemptsStatName(false),
-        attemptsStatName(true),
-        successStatName(false),
-        successStatName(true),
-        slotSearchStatName(true, false),
-        slotSearchStatName(false, false),
-        slotSearchStatName(true, true),
-        slotSearchStatName(false, true),
-        slotValidationStatName(true),
-        slotValidationStatName(false),
-        MISSING_ATTRIBUTES_NAME,
-        TASK_PROCESSOR_RUN_NAME);
-    for (String stat : allStats) {
-      counters.get(stat);
-    }
-
-    exported = true;
-  }
-
-  private void increment(String stat) {
-    assertFullyExported();
-    counters.get(stat).incrementAndGet();
-  }
-
-  @VisibleForTesting
-  static String attemptsStatName(boolean production) {
-    return "preemptor_slot_search_attempts_for_" + prod(production);
-  }
-
-  @VisibleForTesting
-  static String successStatName(boolean production) {
-    return "preemptor_tasks_preempted_" + prod(production);
-  }
-
-  @VisibleForTesting
-  static String slotSearchStatName(boolean success, boolean production) {
-    return String.format("preemptor_slot_search_%s_for_%s", result(success), prod(production));
-  }
-
-  @VisibleForTesting
-  static String slotValidationStatName(boolean success) {
-    return "preemptor_slot_validation_" + result(success);
-  }
-
-  void recordPreemptionAttemptFor(ITaskConfig task) {
-    increment(attemptsStatName(task.isProduction()));
-  }
-
-  void recordTaskPreemption(PreemptionVictim victim) {
-    increment(successStatName(victim.isProduction()));
-  }
-
-  void recordSlotSearchResult(Optional<?> result, ITaskConfig task) {
-    increment(slotSearchStatName(result.isPresent(), task.isProduction()));
-  }
-
-  void recordSlotValidationResult(Optional<?> result) {
-    increment(slotValidationStatName(result.isPresent()));
-  }
-
-  void recordMissingAttributes() {
-    increment(MISSING_ATTRIBUTES_NAME);
-  }
-
-  void recordTaskProcessorRun() {
-    increment(TASK_PROCESSOR_RUN_NAME);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
deleted file mode 100644
index 3d9e27b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.AbstractScheduledService;
-import com.google.inject.AbstractModule;
-import com.google.inject.PrivateModule;
-import com.google.inject.TypeLiteral;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-
-import static java.util.Objects.requireNonNull;
-
-public class PreemptorModule extends AbstractModule {
-
-  private static final Logger LOG = Logger.getLogger(PreemptorModule.class.getName());
-
-  @CmdLine(name = "enable_preemptor",
-      help = "Enable the preemptor and preemption")
-  private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
-
-  @CmdLine(name = "preemption_delay",
-      help = "Time interval after which a pending task becomes eligible to preempt other tasks")
-  private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
-      Arg.create(Amount.of(3L, Time.MINUTES));
-
-  @CmdLine(name = "preemption_slot_hold_time",
-      help = "Time to hold a preemption slot found before it is discarded.")
-  private static final Arg<Amount<Long, Time>> PREEMPTION_SLOT_HOLD_TIME =
-      Arg.create(Amount.of(5L, Time.MINUTES));
-
-  @CmdLine(name = "preemption_slot_search_interval",
-      help = "Time interval between pending task preemption slot searches.")
-  private static final Arg<Amount<Long, Time>> PREEMPTION_SLOT_SEARCH_INTERVAL =
-      Arg.create(Amount.of(1L, Time.MINUTES));
-
-  private final boolean enablePreemptor;
-  private final Amount<Long, Time> preemptionDelay;
-  private final Amount<Long, Time> slotSearchInterval;
-
-  @VisibleForTesting
-  public PreemptorModule(
-      boolean enablePreemptor,
-      Amount<Long, Time> preemptionDelay,
-      Amount<Long, Time> slotSearchInterval) {
-
-    this.enablePreemptor = enablePreemptor;
-    this.preemptionDelay = requireNonNull(preemptionDelay);
-    this.slotSearchInterval = requireNonNull(slotSearchInterval);
-  }
-
-  public PreemptorModule() {
-    this(ENABLE_PREEMPTOR.get(), PREEMPTION_DELAY.get(), PREEMPTION_SLOT_SEARCH_INTERVAL.get());
-  }
-
-  @Override
-  protected void configure() {
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        if (enablePreemptor) {
-          LOG.info("Preemptor Enabled.");
-          bind(PreemptorMetrics.class).in(Singleton.class);
-          bind(PreemptionVictimFilter.class)
-              .to(PreemptionVictimFilter.PreemptionVictimFilterImpl.class);
-          bind(PreemptionVictimFilter.PreemptionVictimFilterImpl.class).in(Singleton.class);
-          bind(Preemptor.class).to(Preemptor.PreemptorImpl.class);
-          bind(Preemptor.PreemptorImpl.class).in(Singleton.class);
-          bind(new TypeLiteral<Amount<Long, Time>>() { })
-              .annotatedWith(PendingTaskProcessor.PreemptionDelay.class)
-              .toInstance(preemptionDelay);
-          bind(BiCacheSettings.class).toInstance(
-              new BiCacheSettings(PREEMPTION_SLOT_HOLD_TIME.get(), "preemption_slot_cache_size"));
-          bind(new TypeLiteral<BiCache<PreemptionProposal, TaskGroupKey>>() { })
-              .in(Singleton.class);
-          bind(PendingTaskProcessor.class).in(Singleton.class);
-          bind(ClusterState.class).to(ClusterStateImpl.class);
-          bind(ClusterStateImpl.class).in(Singleton.class);
-          expose(ClusterStateImpl.class);
-
-          bind(PreemptorService.class).in(Singleton.class);
-          bind(AbstractScheduledService.Scheduler.class).toInstance(
-              AbstractScheduledService.Scheduler.newFixedRateSchedule(
-                  0L,
-                  slotSearchInterval.getValue(),
-                  slotSearchInterval.getUnit().getTimeUnit()));
-
-          expose(PreemptorService.class);
-          expose(PendingTaskProcessor.class);
-        } else {
-          bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
-          LOG.warning("Preemptor Disabled.");
-        }
-        expose(Preemptor.class);
-      }
-    });
-
-    // We can't do this in the private module due to the known conflict between multibindings
-    // and private modules due to multiple injectors.  We accept the added complexity here to keep
-    // the other bindings private.
-    PubsubEventModule.bindSubscriber(binder(), ClusterStateImpl.class);
-    if (enablePreemptor) {
-      SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
-          .to(PreemptorService.class);
-    }
-  }
-
-  static class PreemptorService extends AbstractScheduledService {
-    private final PendingTaskProcessor slotFinder;
-    private final Scheduler schedule;
-
-    @Inject
-    PreemptorService(PendingTaskProcessor slotFinder, Scheduler schedule) {
-      this.slotFinder = requireNonNull(slotFinder);
-      this.schedule = requireNonNull(schedule);
-    }
-
-    @Override
-    protected void runOneIteration() {
-      slotFinder.run();
-    }
-
-    @Override
-    protected Scheduler scheduler() {
-      return schedule;
-    }
-  }
-
-  private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
-    @Override
-    public Optional<String> attemptPreemptionFor(
-        IAssignedTask task,
-        AttributeAggregate jobState,
-        Storage.MutableStoreProvider storeProvider) {
-
-      return Optional.absent();
-    }
-  };
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/http/Offers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
index b991616..4329ce1 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
@@ -28,7 +28,7 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.OfferManager;
+import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.mesos.Protos.Attribute;
 import org.apache.mesos.Protos.ExecutorID;
 import org.apache.mesos.Protos.Resource;

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java b/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java
index 82b6f50..c80e0c8 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java
@@ -22,7 +22,7 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-import org.apache.aurora.scheduler.async.TaskGroups;
+import org.apache.aurora.scheduler.scheduling.TaskGroups;
 
 /**
  * Servlet that exposes detailed information about tasks that are pending.

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
index 6f06693..7c8a008 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
@@ -33,12 +33,12 @@ import com.twitter.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.GuiceUtils.AllowUnchecked;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TaskStatusHandler;
-import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived;
+import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.stats.CachedCounters;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;


Mime
View raw message