aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [5/8] aurora git commit: Break apart async package and AsyncModule into purpose-specific equivalents.
Date Wed, 22 Jul 2015 19:40:02 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
new file mode 100644
index 0000000..1611a3b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.reconciliation;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.eventbus.Subscribe;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.BackoffStrategy;
+
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.storage.Storage;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Watches for task transitions into {@link ScheduleStatus#KILLING KILLING} and periodically
+ * retries {@link Driver#killTask(String)} until the task transitions.
+ */
+public class KillRetry implements EventSubscriber {
+  private static final Logger LOG = Logger.getLogger(KillRetry.class.getName());
+
+  @VisibleForTesting
+  static final String RETRIES_COUNTER = "task_kill_retries";
+
+  private final Driver driver;
+  private final Storage storage;
+  private final ScheduledExecutorService executor;
+  private final BackoffStrategy backoffStrategy;
+  private final AtomicLong killRetries;
+
+  @Inject
+  KillRetry(
+      Driver driver,
+      Storage storage,
+      @AsyncExecutor ScheduledExecutorService executor,
+      BackoffStrategy backoffStrategy,
+      StatsProvider statsProvider) {
+
+    this.driver = requireNonNull(driver);
+    this.storage = requireNonNull(storage);
+    this.executor = requireNonNull(executor);
+    this.backoffStrategy = requireNonNull(backoffStrategy);
+    killRetries = statsProvider.makeCounter(RETRIES_COUNTER);
+  }
+
+  @Subscribe
+  public void taskChangedState(TaskStateChange stateChange) {
+    if (stateChange.getNewState() == ScheduleStatus.KILLING) {
+      new KillAttempt(stateChange.getTaskId()).tryLater();
+    }
+  }
+
+  private class KillAttempt implements Runnable {
+    private final String taskId;
+    private final AtomicLong retryInMs = new AtomicLong();
+
+    KillAttempt(String taskId) {
+      this.taskId = taskId;
+    }
+
+    void tryLater() {
+      retryInMs.set(backoffStrategy.calculateBackoffMs(retryInMs.get()));
+      executor.schedule(this, retryInMs.get(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void run() {
+      Query.Builder query = Query.taskScoped(taskId).byStatus(ScheduleStatus.KILLING);
+      if (!Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) {
+        LOG.info("Task " + taskId + " not yet killed, retrying.");
+
+        // Kill did not yet take effect, try again.
+        driver.killTask(taskId);
+        killRetries.incrementAndGet();
+        tryLater();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
new file mode 100644
index 0000000..406c077
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.reconciliation;
+
+import javax.inject.Singleton;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.Positive;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.BackoffStrategy;
+import com.twitter.common.util.TruncatedBinaryBackoff;
+
+import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.reconciliation.TaskReconciler.TaskReconcilerSettings;
+
+/**
+ * Binding module for state reconciliation and retry logic.
+ */
+public class ReconciliationModule extends AbstractModule {
+
+  @CmdLine(name = "transient_task_state_timeout",
+      help = "The amount of time after which to treat a task stuck in a transient state as LOST.")
+  private static final Arg<Amount<Long, Time>> TRANSIENT_TASK_STATE_TIMEOUT =
+      Arg.create(Amount.of(5L, Time.MINUTES));
+
+  @CmdLine(name = "initial_task_kill_retry_interval",
+      help = "When killing a task, retry after this delay if mesos has not responded,"
+          + " backing off up to transient_task_state_timeout")
+  private static final Arg<Amount<Long, Time>> INITIAL_TASK_KILL_RETRY_INTERVAL =
+      Arg.create(Amount.of(5L, Time.SECONDS));
+
+  // Reconciliation may create a big surge of status updates in a large cluster. Setting the default
+  // initial delay to 1 minute to ease up storage contention during scheduler start up.
+  @CmdLine(name = "reconciliation_initial_delay",
+      help = "Initial amount of time to delay task reconciliation after scheduler start up.")
+  private static final Arg<Amount<Long, Time>> RECONCILIATION_INITIAL_DELAY =
+      Arg.create(Amount.of(1L, Time.MINUTES));
+
+  @Positive
+  @CmdLine(name = "reconciliation_explicit_interval",
+      help = "Interval on which scheduler will ask Mesos for status updates of all non-terminal "
+          + "tasks known to scheduler.")
+  private static final Arg<Amount<Long, Time>> RECONCILIATION_EXPLICIT_INTERVAL =
+      Arg.create(Amount.of(60L, Time.MINUTES));
+
+  @Positive
+  @CmdLine(name = "reconciliation_implicit_interval",
+      help = "Interval on which scheduler will ask Mesos for status updates of all non-terminal "
+          + "tasks known to Mesos.")
+  private static final Arg<Amount<Long, Time>> RECONCILIATION_IMPLICIT_INTERVAL =
+      Arg.create(Amount.of(60L, Time.MINUTES));
+
+  @CmdLine(name = "reconciliation_schedule_spread",
+      help = "Difference between explicit and implicit reconciliation intervals intended to "
+          + "create a non-overlapping task reconciliation schedule.")
+  private static final Arg<Amount<Long, Time>> RECONCILIATION_SCHEDULE_SPREAD =
+      Arg.create(Amount.of(30L, Time.MINUTES));
+
+  @Override
+  protected void configure() {
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(new TypeLiteral<Amount<Long, Time>>() { })
+            .toInstance(TRANSIENT_TASK_STATE_TIMEOUT.get());
+
+        bind(TaskTimeout.class).in(Singleton.class);
+        expose(TaskTimeout.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), TaskTimeout.class);
+    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskTimeout.class);
+
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(BackoffStrategy.class).toInstance(
+            new TruncatedBinaryBackoff(
+                INITIAL_TASK_KILL_RETRY_INTERVAL.get(),
+                TRANSIENT_TASK_STATE_TIMEOUT.get()));
+        bind(KillRetry.class).in(Singleton.class);
+        expose(KillRetry.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), KillRetry.class);
+
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(TaskReconcilerSettings.class).toInstance(new TaskReconcilerSettings(
+            RECONCILIATION_INITIAL_DELAY.get(),
+            RECONCILIATION_EXPLICIT_INTERVAL.get(),
+            RECONCILIATION_IMPLICIT_INTERVAL.get(),
+            RECONCILIATION_SCHEDULE_SPREAD.get()));
+        bind(TaskReconciler.class).in(Singleton.class);
+        expose(TaskReconciler.class);
+      }
+    });
+    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskReconciler.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
new file mode 100644
index 0000000..653e52b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.reconciliation;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.mesos.Protos;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.twitter.common.quantity.Time.MINUTES;
+
+/**
+ * A task reconciler that periodically triggers Mesos (implicit) and Aurora (explicit) task
+ * reconciliation to synchronize global task states. More on task reconciliation:
+ * http://mesos.apache.org/documentation/latest/reconciliation.
+ */
+public class TaskReconciler extends AbstractIdleService {
+
+  @VisibleForTesting
+  static final String EXPLICIT_STAT_NAME = "reconciliation_explicit_runs";
+
+  @VisibleForTesting
+  static final String IMPLICIT_STAT_NAME = "reconciliation_implicit_runs";
+
+  private final TaskReconcilerSettings settings;
+  private final Storage storage;
+  private final Driver driver;
+  private final ScheduledExecutorService executor;
+  private final AtomicLong explicitRuns;
+  private final AtomicLong implicitRuns;
+
+  static class TaskReconcilerSettings {
+    private final Amount<Long, Time> explicitInterval;
+    private final Amount<Long, Time> implicitInterval;
+    private final long explicitDelayMinutes;
+    private final long implicitDelayMinutes;
+
+    @VisibleForTesting
+    TaskReconcilerSettings(
+        Amount<Long, Time> initialDelay,
+        Amount<Long, Time> explicitInterval,
+        Amount<Long, Time> implicitInterval,
+        Amount<Long, Time> scheduleSpread) {
+
+      this.explicitInterval = requireNonNull(explicitInterval);
+      this.implicitInterval = requireNonNull(implicitInterval);
+      explicitDelayMinutes = requireNonNull(initialDelay).as(MINUTES);
+      implicitDelayMinutes = initialDelay.as(MINUTES) + scheduleSpread.as(MINUTES);
+      checkArgument(
+          explicitDelayMinutes >= 0,
+          "Invalid explicit reconciliation delay: " + explicitDelayMinutes);
+      checkArgument(
+          implicitDelayMinutes >= 0L,
+          "Invalid implicit reconciliation delay: " + implicitDelayMinutes);
+    }
+  }
+
+  @Inject
+  TaskReconciler(
+      TaskReconcilerSettings settings,
+      Storage storage,
+      Driver driver,
+      @AsyncExecutor ScheduledExecutorService executor,
+      StatsProvider stats) {
+
+    this.settings = requireNonNull(settings);
+    this.storage = requireNonNull(storage);
+    this.driver = requireNonNull(driver);
+    this.executor = requireNonNull(executor);
+    this.explicitRuns = stats.makeCounter(EXPLICIT_STAT_NAME);
+    this.implicitRuns = stats.makeCounter(IMPLICIT_STAT_NAME);
+  }
+
+  @Override
+  protected void startUp() {
+    // Schedule explicit reconciliation.
+    executor.scheduleAtFixedRate(
+        new Runnable() {
+          @Override
+          public void run() {
+            ImmutableSet<Protos.TaskStatus> active = FluentIterable
+                .from(Storage.Util.fetchTasks(
+                    storage,
+                    Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES)))
+                .transform(TASK_TO_PROTO)
+                .toSet();
+
+            driver.reconcileTasks(active);
+            explicitRuns.incrementAndGet();
+          }
+        },
+        settings.explicitDelayMinutes,
+        settings.explicitInterval.as(MINUTES),
+        MINUTES.getTimeUnit());
+
+    // Schedule implicit reconciliation.
+    executor.scheduleAtFixedRate(
+        new Runnable() {
+          @Override
+          public void run() {
+            driver.reconcileTasks(ImmutableSet.of());
+            implicitRuns.incrementAndGet();
+          }
+        },
+        settings.implicitDelayMinutes,
+        settings.implicitInterval.as(MINUTES),
+        MINUTES.getTimeUnit());
+  }
+
+  @Override
+  protected void shutDown() {
+    // Nothing to do - await VM shutdown.
+  }
+
+  @VisibleForTesting
+  static final Function<IScheduledTask, Protos.TaskStatus> TASK_TO_PROTO =
+      t -> Protos.TaskStatus.newBuilder()
+          // TODO(maxim): State is required by protobuf but ignored by Mesos for reconciliation
+          // purposes. This is the artifact of the native API. The new HTTP Mesos API will be
+          // accepting task IDs instead. AURORA-1326 tracks solution on the scheduler side.
+          // Setting TASK_RUNNING as a safe dummy value here.
+          .setState(Protos.TaskState.TASK_RUNNING)
+          .setTaskId(Protos.TaskID.newBuilder().setValue(t.getAssignedTask().getTaskId()).build())
+          .build();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
new file mode 100644
index 0000000..fb83972
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.reconciliation;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.state.StateChangeResult;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
+
+/**
+ * Observes task transitions and identifies tasks that are 'stuck' in a transient state.  Stuck
+ * tasks will be transitioned to the LOST state.
+ */
+class TaskTimeout extends AbstractIdleService implements EventSubscriber {
+  private static final Logger LOG = Logger.getLogger(TaskTimeout.class.getName());
+
+  @VisibleForTesting
+  static final Amount<Long, Time> NOT_STARTED_RETRY = Amount.of(5L, Time.SECONDS);
+
+  @VisibleForTesting
+  static final String TIMED_OUT_TASKS_COUNTER = "timed_out_tasks";
+
+  @VisibleForTesting
+  static final Optional<String> TIMEOUT_MESSAGE = Optional.of("Task timed out");
+
+  @VisibleForTesting
+  static final Set<ScheduleStatus> TRANSIENT_STATES = EnumSet.of(
+      ScheduleStatus.ASSIGNED,
+      ScheduleStatus.PREEMPTING,
+      ScheduleStatus.RESTARTING,
+      ScheduleStatus.KILLING,
+      ScheduleStatus.DRAINING);
+
+  private final ScheduledExecutorService executor;
+  private final Storage storage;
+  private final StateManager stateManager;
+  private final Amount<Long, Time> timeout;
+  private final AtomicLong timedOutTasks;
+
+  @Inject
+  TaskTimeout(
+      @AsyncExecutor ScheduledExecutorService executor,
+      Storage storage,
+      StateManager stateManager,
+      Amount<Long, Time> timeout,
+      StatsProvider statsProvider) {
+
+    this.executor = requireNonNull(executor);
+    this.storage = requireNonNull(storage);
+    this.stateManager = requireNonNull(stateManager);
+    this.timeout = requireNonNull(timeout);
+    this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER);
+  }
+
+  private static boolean isTransient(ScheduleStatus status) {
+    return TRANSIENT_STATES.contains(status);
+  }
+
+  @Override
+  protected void startUp() {
+    // No work to do here for startup, however we leverage the state tracking in
+    // AbstractIdleService.
+  }
+
+  @Override
+  protected void shutDown() {
+    // Nothing to do for shutting down.
+  }
+
+  private class TimedOutTaskHandler implements Runnable {
+    private final String taskId;
+    private final ScheduleStatus newState;
+
+    TimedOutTaskHandler(String taskId, ScheduleStatus newState) {
+      this.taskId = taskId;
+      this.newState = newState;
+    }
+
+    @Override
+    public void run() {
+      if (isRunning()) {
+        // This query acts as a CAS by including the state that we expect the task to be in
+        // if the timeout is still valid.  Ideally, the future would have already been
+        // canceled, but in the event of a state transition race, including transientState
+        // prevents an unintended task timeout.
+        // Note: This requires LOST transitions trigger Driver.killTask.
+        StateChangeResult result = storage.write(new MutateWork.Quiet<StateChangeResult>() {
+          @Override
+          public StateChangeResult apply(Storage.MutableStoreProvider storeProvider) {
+            return stateManager.changeState(
+                storeProvider,
+                taskId,
+                Optional.of(newState),
+                ScheduleStatus.LOST,
+                TIMEOUT_MESSAGE);
+          }
+        });
+
+        if (result == StateChangeResult.SUCCESS) {
+          LOG.info("Timeout reached for task " + taskId + ":" + taskId);
+          timedOutTasks.incrementAndGet();
+        }
+      } else {
+        // Our service is not yet started.  We don't want to lose track of the task, so
+        // we will try again later.
+        LOG.fine("Retrying timeout of task " + taskId + " in " + NOT_STARTED_RETRY);
+        executor.schedule(
+            this,
+            NOT_STARTED_RETRY.getValue(),
+            NOT_STARTED_RETRY.getUnit().getTimeUnit());
+      }
+    }
+  }
+
+  @Subscribe
+  public void recordStateChange(TaskStateChange change) {
+    if (isTransient(change.getNewState())) {
+      executor.schedule(
+          new TimedOutTaskHandler(change.getTaskId(), change.getNewState()),
+          timeout.getValue(),
+          timeout.getUnit().getTimeUnit());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java
new file mode 100644
index 0000000..bfc23cd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/RescheduleCalculator.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.BackoffStrategy;
+import com.twitter.common.util.Random;
+
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.DRAINING;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
+
+/**
+ * Calculates scheduling delays for tasks.
+ */
+public interface RescheduleCalculator {
+  /**
+   * Calculates the delay, in milliseconds, before the task should be considered eligible for
+   * (re)scheduling at scheduler startup.
+   *
+   * @param task Task to calculate delay for.
+   * @return Delay in msec.
+   */
+  long getStartupScheduleDelayMs(IScheduledTask task);
+
+  /**
+   * Calculates the penalty, in milliseconds, that a task should be penalized before being
+   * eligible for rescheduling.
+   *
+   * @param task Task to calculate delay for.
+   * @return Delay in msec.
+   */
+  long getFlappingPenaltyMs(IScheduledTask task);
+
+  class RescheduleCalculatorImpl implements RescheduleCalculator {
+
+    private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
+
+    private final Storage storage;
+    private final RescheduleCalculatorSettings settings;
+    // TODO(wfarner): Inject 'random' in the constructor for better test coverage.
+    private final Random random = new Random.SystemRandom(new java.util.Random());
+
+    private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =
+        Predicates.in(Tasks.ACTIVE_STATES);
+
+    private static final Set<ScheduleStatus> INTERRUPTED_TASK_STATES =
+        EnumSet.of(RESTARTING, KILLING, DRAINING);
+
+    private final Predicate<IScheduledTask> flapped = new Predicate<IScheduledTask>() {
+      @Override
+      public boolean apply(IScheduledTask task) {
+        if (!task.isSetTaskEvents()) {
+          return false;
+        }
+
+        List<ITaskEvent> events = Lists.reverse(task.getTaskEvents());
+
+        // Avoid penalizing tasks that were interrupted by outside action, such as a user
+        // restarting them.
+        if (Iterables.any(Iterables.transform(events, Tasks.TASK_EVENT_TO_STATUS),
+            Predicates.in(INTERRUPTED_TASK_STATES))) {
+          return false;
+        }
+
+        ITaskEvent terminalEvent = Iterables.get(events, 0);
+        ScheduleStatus terminalState = terminalEvent.getStatus();
+        Preconditions.checkState(Tasks.isTerminated(terminalState));
+
+        ITaskEvent activeEvent = Iterables.find(
+            events,
+            Predicates.compose(IS_ACTIVE_STATUS, Tasks.TASK_EVENT_TO_STATUS));
+
+        long thresholdMs = settings.flappingTaskThreashold.as(Time.MILLISECONDS);
+
+        return (terminalEvent.getTimestamp() - activeEvent.getTimestamp()) < thresholdMs;
+      }
+    };
+
+    @VisibleForTesting
+    public static class RescheduleCalculatorSettings {
+      private final BackoffStrategy flappingTaskBackoff;
+      private final Amount<Long, Time> flappingTaskThreashold;
+      private final Amount<Integer, Time>  maxStartupRescheduleDelay;
+
+      public RescheduleCalculatorSettings(
+          BackoffStrategy flappingTaskBackoff,
+          Amount<Long, Time> flappingTaskThreashold,
+          Amount<Integer, Time> maxStartupRescheduleDelay) {
+
+        this.flappingTaskBackoff = requireNonNull(flappingTaskBackoff);
+        this.flappingTaskThreashold = requireNonNull(flappingTaskThreashold);
+        this.maxStartupRescheduleDelay = requireNonNull(maxStartupRescheduleDelay);
+      }
+    }
+
+    @Inject
+    RescheduleCalculatorImpl(Storage storage, RescheduleCalculatorSettings settings) {
+      this.storage = requireNonNull(storage);
+      this.settings = requireNonNull(settings);
+    }
+
+    @Override
+    public long getStartupScheduleDelayMs(IScheduledTask task) {
+      return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS).intValue())
+          + getFlappingPenaltyMs(task);
+    }
+
+    private Optional<IScheduledTask> getTaskAncestor(IScheduledTask task) {
+      if (!task.isSetAncestorId()) {
+        return Optional.absent();
+      }
+
+      Iterable<IScheduledTask> res =
+          Storage.Util.fetchTasks(storage, Query.taskScoped(task.getAncestorId()));
+
+      return Optional.fromNullable(Iterables.getOnlyElement(res, null));
+    }
+
+    @Override
+    public long getFlappingPenaltyMs(IScheduledTask task) {
+      Optional<IScheduledTask> curTask = getTaskAncestor(task);
+      long penaltyMs = 0;
+      while (curTask.isPresent() && flapped.apply(curTask.get())) {
+        LOG.info(
+            String.format("Ancestor of %s flapped: %s", Tasks.id(task), Tasks.id(curTask.get())));
+        long newPenalty = settings.flappingTaskBackoff.calculateBackoffMs(penaltyMs);
+        // If the backoff strategy is truncated then there is no need for us to continue.
+        if (newPenalty == penaltyMs) {
+          break;
+        }
+        penaltyMs = newPenalty;
+        curTask = getTaskAncestor(curTask.get());
+      }
+
+      return penaltyMs;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
new file mode 100644
index 0000000..c7a1a46
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import javax.inject.Singleton;
+
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.inject.AbstractModule;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.Positive;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.TruncatedBinaryBackoff;
+
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.preemptor.BiCache;
+import org.apache.aurora.scheduler.scheduling.RescheduleCalculator.RescheduleCalculatorImpl;
+
+/**
+ * Binding module for task scheduling logic.
+ */
+public class SchedulingModule extends AbstractModule {
+
+  @CmdLine(name = "max_schedule_attempts_per_sec",
+      help = "Maximum number of scheduling attempts to make per second.")
+  private static final Arg<Double> MAX_SCHEDULE_ATTEMPTS_PER_SEC = Arg.create(40D);
+
+  @CmdLine(name = "flapping_task_threshold",
+      help = "A task that repeatedly runs for less than this time is considered to be flapping.")
+  private static final Arg<Amount<Long, Time>> FLAPPING_THRESHOLD =
+      Arg.create(Amount.of(5L, Time.MINUTES));
+
+  @CmdLine(name = "initial_flapping_task_delay",
+      help = "Initial amount of time to wait before attempting to schedule a flapping task.")
+  private static final Arg<Amount<Long, Time>> INITIAL_FLAPPING_DELAY =
+      Arg.create(Amount.of(30L, Time.SECONDS));
+
+  @CmdLine(name = "max_flapping_task_delay",
+      help = "Maximum delay between attempts to schedule a flapping task.")
+  private static final Arg<Amount<Long, Time>> MAX_FLAPPING_DELAY =
+      Arg.create(Amount.of(5L, Time.MINUTES));
+
+  @CmdLine(name = "max_reschedule_task_delay_on_startup",
+      help = "Upper bound of random delay for pending task rescheduling on scheduler startup.")
+  private static final Arg<Amount<Integer, Time>> MAX_RESCHEDULING_DELAY =
+      Arg.create(Amount.of(30, Time.SECONDS));
+
+  @Positive
+  @CmdLine(name = "first_schedule_delay",
+      help = "Initial amount of time to wait before first attempting to schedule a PENDING task.")
+  private static final Arg<Amount<Long, Time>> FIRST_SCHEDULE_DELAY =
+      Arg.create(Amount.of(1L, Time.MILLISECONDS));
+
+  @Positive
+  @CmdLine(name = "initial_schedule_penalty",
+      help = "Initial amount of time to wait before attempting to schedule a task that has failed"
+          + " to schedule.")
+  private static final Arg<Amount<Long, Time>> INITIAL_SCHEDULE_PENALTY =
+      Arg.create(Amount.of(1L, Time.SECONDS));
+
+  @CmdLine(name = "max_schedule_penalty",
+      help = "Maximum delay between attempts to schedule a PENDING tasks.")
+  private static final Arg<Amount<Long, Time>> MAX_SCHEDULE_PENALTY =
+      Arg.create(Amount.of(1L, Time.MINUTES));
+
+  @CmdLine(name = "offer_reservation_duration", help = "Time to reserve a slave's offers while "
+      + "trying to satisfy a task preempting another.")
+  private static final Arg<Amount<Long, Time>> RESERVATION_DURATION =
+      Arg.create(Amount.of(3L, Time.MINUTES));
+
+  @Override
+  protected void configure() {
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(TaskGroups.TaskGroupsSettings.class).toInstance(new TaskGroups.TaskGroupsSettings(
+            FIRST_SCHEDULE_DELAY.get(),
+            new TruncatedBinaryBackoff(
+                INITIAL_SCHEDULE_PENALTY.get(),
+                MAX_SCHEDULE_PENALTY.get()),
+            RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get())));
+
+        bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class)
+            .toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
+                new TruncatedBinaryBackoff(INITIAL_FLAPPING_DELAY.get(), MAX_FLAPPING_DELAY.get()),
+                FLAPPING_THRESHOLD.get(),
+                MAX_RESCHEDULING_DELAY.get()));
+
+        bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class);
+        expose(RescheduleCalculator.class);
+        bind(TaskGroups.class).in(Singleton.class);
+        expose(TaskGroups.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
+
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).in(Singleton.class);
+        bind(BiCache.BiCacheSettings.class).toInstance(
+            new BiCache.BiCacheSettings(RESERVATION_DURATION.get(), "reservation_cache_size"));
+        bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
+        bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class);
+        expose(TaskScheduler.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class);
+
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(TaskThrottler.class).in(Singleton.class);
+        expose(TaskThrottler.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), TaskThrottler.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java
new file mode 100644
index 0000000..5d31955
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroup.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.Queue;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+
+/**
+ * A group of task IDs that are eligible for scheduling, but may be waiting for a backoff to expire.
+ */
+class TaskGroup {
+  private final TaskGroupKey key;
+  private long penaltyMs;
+  private final Queue<String> tasks;
+
+  TaskGroup(TaskGroupKey key, String initialTaskId) {
+    this.key = key;
+    this.penaltyMs = 0;
+    this.tasks = Lists.newLinkedList();
+    this.tasks.add(initialTaskId);
+  }
+
+  synchronized TaskGroupKey getKey() {
+    return key;
+  }
+
+  synchronized Optional<String> peek() {
+    return Optional.fromNullable(tasks.peek());
+  }
+
+  synchronized boolean hasMore() {
+    return !tasks.isEmpty();
+  }
+
+  synchronized void remove(String taskId) {
+    tasks.remove(taskId);
+  }
+
+  synchronized void offer(String taskId) {
+    tasks.offer(taskId);
+  }
+
+  synchronized void setPenaltyMs(long penaltyMs) {
+    this.penaltyMs = penaltyMs;
+  }
+
+  // Begin methods used for debug interfaces.
+
+  public synchronized String getName() {
+    return key.toString();
+  }
+
+  public synchronized Set<String> getTaskIds() {
+    return ImmutableSet.copyOf(tasks);
+  }
+
+  public synchronized long getPenaltyMs() {
+    return penaltyMs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
new file mode 100644
index 0000000..3f262bf
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.RateLimiter;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.SlidingStats;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.BackoffStrategy;
+import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
+
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+
+/**
+ * A collection of task groups, where a task group is a collection of tasks that are known to be
+ * equal in the way they schedule. This is expected to be tasks associated with the same job key,
+ * who also have {@code equal()} {@link org.apache.aurora.scheduler.storage.entities.ITaskConfig}
+ * values.
+ * <p>
+ * This is used to prevent redundant work in trying to schedule tasks as well as to provide
+ * nearly-equal responsiveness when scheduling across jobs.  In other words, a 1000 instance job
+ * cannot starve a 1 instance job.
+ */
+public class TaskGroups implements EventSubscriber {
+
+  private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
+
+  private final ConcurrentMap<TaskGroupKey, TaskGroup> groups = Maps.newConcurrentMap();
+  private final ScheduledExecutorService executor;
+  private final TaskScheduler taskScheduler;
+  private final long firstScheduleDelay;
+  private final BackoffStrategy backoff;
+  private final RescheduleCalculator rescheduleCalculator;
+
+  // Track the penalties of tasks at the time they were scheduled. This is to provide data that
+  // may influence the selection of a different backoff strategy.
+  private final SlidingStats scheduledTaskPenalties =
+      new SlidingStats("scheduled_task_penalty", "ms");
+
+  public static class TaskGroupsSettings {
+    private final Amount<Long, Time> firstScheduleDelay;
+    private final BackoffStrategy taskGroupBackoff;
+    private final RateLimiter rateLimiter;
+
+    public TaskGroupsSettings(
+        Amount<Long, Time> firstScheduleDelay,
+        BackoffStrategy taskGroupBackoff,
+        RateLimiter rateLimiter) {
+
+      this.firstScheduleDelay = requireNonNull(firstScheduleDelay);
+      this.taskGroupBackoff = requireNonNull(taskGroupBackoff);
+      this.rateLimiter = requireNonNull(rateLimiter);
+    }
+  }
+
+  @Inject
+  TaskGroups(
+      ShutdownRegistry shutdownRegistry,
+      TaskGroupsSettings settings,
+      TaskScheduler taskScheduler,
+      RescheduleCalculator rescheduleCalculator) {
+
+    this(
+        createThreadPool(shutdownRegistry),
+        settings.firstScheduleDelay,
+        settings.taskGroupBackoff,
+        settings.rateLimiter,
+        taskScheduler,
+        rescheduleCalculator);
+  }
+
+  @VisibleForTesting
+  TaskGroups(
+      final ScheduledExecutorService executor,
+      final Amount<Long, Time> firstScheduleDelay,
+      final BackoffStrategy backoff,
+      final RateLimiter rateLimiter,
+      final TaskScheduler taskScheduler,
+      final RescheduleCalculator rescheduleCalculator) {
+
+    requireNonNull(firstScheduleDelay);
+    Preconditions.checkArgument(firstScheduleDelay.getValue() > 0);
+
+    this.executor = requireNonNull(executor);
+    requireNonNull(rateLimiter);
+    requireNonNull(taskScheduler);
+    this.firstScheduleDelay = firstScheduleDelay.as(Time.MILLISECONDS);
+    this.backoff = requireNonNull(backoff);
+    this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
+
+    this.taskScheduler = new TaskScheduler() {
+      @Override
+      public boolean schedule(String taskId) {
+        rateLimiter.acquire();
+        return taskScheduler.schedule(taskId);
+      }
+    };
+  }
+
+  private synchronized void evaluateGroupLater(Runnable evaluate, TaskGroup group) {
+    // Avoid check-then-act by holding the intrinsic lock.  If not done atomically, we could
+    // remove a group while a task is being added to it.
+    if (group.hasMore()) {
+      executor.schedule(evaluate, group.getPenaltyMs(), MILLISECONDS);
+    } else {
+      groups.remove(group.getKey());
+    }
+  }
+
+  private void startGroup(final TaskGroup group) {
+    Runnable monitor = new Runnable() {
+      @Override
+      public void run() {
+        Optional<String> taskId = group.peek();
+        long penaltyMs = 0;
+        if (taskId.isPresent()) {
+          if (taskScheduler.schedule(taskId.get())) {
+            scheduledTaskPenalties.accumulate(group.getPenaltyMs());
+            group.remove(taskId.get());
+            if (group.hasMore()) {
+              penaltyMs = firstScheduleDelay;
+            }
+          } else {
+            penaltyMs = backoff.calculateBackoffMs(group.getPenaltyMs());
+          }
+        }
+
+        group.setPenaltyMs(penaltyMs);
+        evaluateGroupLater(this, group);
+      }
+    };
+    evaluateGroupLater(monitor, group);
+  }
+
+  private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) {
+    final ScheduledThreadPoolExecutor executor =
+        AsyncUtil.singleThreadLoggingScheduledExecutor("TaskScheduler-%d", LOG);
+
+    Stats.exportSize("schedule_queue_size", executor.getQueue());
+    shutdownRegistry.addAction(new Command() {
+      @Override
+      public void execute() {
+        new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
+      }
+    });
+    return executor;
+  }
+
+  /**
+   * Informs the task groups of a task state change.
+   * <p>
+   * This is used to observe {@link org.apache.aurora.gen.ScheduleStatus#PENDING} tasks and begin
+   * attempting to schedule them.
+   *
+   * @param stateChange State change notification.
+   */
+  @Subscribe
+  public synchronized void taskChangedState(TaskStateChange stateChange) {
+    if (stateChange.getNewState() == PENDING) {
+      IScheduledTask task = stateChange.getTask();
+      TaskGroupKey key = TaskGroupKey.from(task.getAssignedTask().getTask());
+      TaskGroup newGroup = new TaskGroup(key, Tasks.id(task));
+      TaskGroup existing = groups.putIfAbsent(key, newGroup);
+      if (existing == null) {
+        long penaltyMs;
+        if (stateChange.isTransition()) {
+          penaltyMs = firstScheduleDelay;
+        } else {
+          penaltyMs = rescheduleCalculator.getStartupScheduleDelayMs(task);
+        }
+        newGroup.setPenaltyMs(penaltyMs);
+        startGroup(newGroup);
+      } else {
+        existing.offer(Tasks.id(task));
+      }
+    }
+  }
+
+  /**
+   * Signals the scheduler that tasks have been deleted.
+   *
+   * @param deleted Tasks deleted event.
+   */
+  @Subscribe
+  public synchronized void tasksDeleted(TasksDeleted deleted) {
+    for (IAssignedTask task
+        : Iterables.transform(deleted.getTasks(), Tasks.SCHEDULED_TO_ASSIGNED)) {
+      TaskGroup group = groups.get(TaskGroupKey.from(task.getTask()));
+      if (group != null) {
+        group.remove(task.getTaskId());
+      }
+    }
+  }
+
+  public Iterable<TaskGroup> getGroups() {
+    return ImmutableSet.copyOf(groups.values());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
new file mode 100644
index 0000000..04e5063
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.eventbus.Subscribe;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.stats.Stats;
+
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.preemptor.BiCache;
+import org.apache.aurora.scheduler.preemptor.Preemptor;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.state.TaskAssigner;
+import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+
+/**
+ * Enables scheduling and preemption of tasks.
+ */
+public interface TaskScheduler extends EventSubscriber {
+
+  /**
+   * Attempts to schedule a task, possibly performing irreversible actions.
+   *
+   * @param taskId The task to attempt to schedule.
+   * @return {@code true} if the task was scheduled, {@code false} otherwise. The caller should
+   *         call schedule again if {@code false} is returned.
+   */
+  boolean schedule(String taskId);
+
+  /**
+   * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where each task
+   * backs off after a failed scheduling attempt.
+   * <p>
+   * Pending tasks are advertised to the scheduler via internal pubsub notifications.
+   */
+  class TaskSchedulerImpl implements TaskScheduler {
+    /**
+     * Binding annotation for the time duration of reservations.
+     */
+    @VisibleForTesting
+    @Qualifier
+    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+    public @interface ReservationDuration { }
+
+    private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName());
+
+    private final Storage storage;
+    private final StateManager stateManager;
+    private final TaskAssigner assigner;
+    private final OfferManager offerManager;
+    private final Preemptor preemptor;
+    private final BiCache<String, TaskGroupKey> reservations;
+
+    private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
+    private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed");
+    private final AtomicLong attemptsNoMatch = Stats.exportLong("schedule_attempts_no_match");
+
+    @Inject
+    TaskSchedulerImpl(
+        Storage storage,
+        StateManager stateManager,
+        TaskAssigner assigner,
+        OfferManager offerManager,
+        Preemptor preemptor,
+        BiCache<String, TaskGroupKey> reservations) {
+
+      this.storage = requireNonNull(storage);
+      this.stateManager = requireNonNull(stateManager);
+      this.assigner = requireNonNull(assigner);
+      this.offerManager = requireNonNull(offerManager);
+      this.preemptor = requireNonNull(preemptor);
+      this.reservations = requireNonNull(reservations);
+    }
+
+    private Function<HostOffer, Assignment> getAssignerFunction(
+        final MutableStoreProvider storeProvider,
+        final ResourceRequest resourceRequest,
+        final String taskId) {
+
+      // TODO(wfarner): Turn this into Predicate<Offer>, and in the caller, find the first match
+      // and perform the assignment at the very end.  This will allow us to use optimistic locking
+      // at the top of the stack and avoid holding the write lock for too long.
+      return new Function<HostOffer, Assignment>() {
+        @Override
+        public Assignment apply(HostOffer offer) {
+          Optional<TaskGroupKey> reservation =
+              reservations.get(offer.getOffer().getSlaveId().getValue());
+
+          if (reservation.isPresent()) {
+            if (TaskGroupKey.from(resourceRequest.getTask()).equals(reservation.get())) {
+              // Slave is reserved to satisfy this task group.
+              return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId);
+            } else {
+              // Slave is reserved for another task.
+              return Assignment.failure();
+            }
+          } else {
+            // Slave is not reserved.
+            return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId);
+          }
+        }
+      };
+    }
+
+    @VisibleForTesting
+    static final Optional<String> LAUNCH_FAILED_MSG =
+        Optional.of("Unknown exception attempting to schedule task.");
+
+    @Timed("task_schedule_attempt")
+    @Override
+    public boolean schedule(final String taskId) {
+      attemptsFired.incrementAndGet();
+      try {
+        return storage.write(new MutateWork.Quiet<Boolean>() {
+          @Override
+          public Boolean apply(MutableStoreProvider store) {
+            return scheduleTask(store, taskId);
+          }
+        });
+      } catch (RuntimeException e) {
+        // We catch the generic unchecked exception here to ensure tasks are not abandoned
+        // if there is a transient issue resulting in an unchecked exception.
+        LOG.log(Level.WARNING, "Task scheduling unexpectedly failed, will be retried", e);
+        attemptsFailed.incrementAndGet();
+        return false;
+      }
+    }
+
+    @Timed("task_schedule_attempt_locked")
+    protected boolean scheduleTask(MutableStoreProvider store, String taskId) {
+      LOG.fine("Attempting to schedule task " + taskId);
+      IAssignedTask assignedTask = Iterables.getOnlyElement(
+          Iterables.transform(
+              store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
+              Tasks.SCHEDULED_TO_ASSIGNED),
+          null);
+
+      if (assignedTask == null) {
+        LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
+      } else {
+        ITaskConfig task = assignedTask.getTask();
+        AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
+        try {
+          boolean launched = offerManager.launchFirst(
+              getAssignerFunction(store, new ResourceRequest(task, aggregate), taskId),
+              TaskGroupKey.from(task));
+
+          if (!launched) {
+            // Task could not be scheduled.
+            // TODO(maxim): Now that preemption slots are searched asynchronously, consider
+            // retrying a launch attempt within the current scheduling round IFF a reservation is
+            // available.
+            maybePreemptFor(assignedTask, aggregate, store);
+            attemptsNoMatch.incrementAndGet();
+            return false;
+          }
+        } catch (OfferManager.LaunchException e) {
+          LOG.log(Level.WARNING, "Failed to launch task.", e);
+          attemptsFailed.incrementAndGet();
+
+          // The attempt to schedule the task failed, so we need to backpedal on the
+          // assignment.
+          // It is in the LOST state and a new task will move to PENDING to replace it.
+          // Should the state change fail due to storage issues, that's okay.  The task will
+          // time out in the ASSIGNED state and be moved to LOST.
+          stateManager.changeState(
+              store,
+              taskId,
+              Optional.of(PENDING),
+              LOST,
+              LAUNCH_FAILED_MSG);
+        }
+      }
+
+      return true;
+    }
+
+    private void maybePreemptFor(
+        IAssignedTask task,
+        AttributeAggregate jobState,
+        MutableStoreProvider storeProvider) {
+
+      if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
+        return;
+      }
+      Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider);
+      if (slaveId.isPresent()) {
+        reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
+      }
+    }
+
+    @Subscribe
+    public void taskChanged(final TaskStateChange stateChangeEvent) {
+      if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
+        IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask();
+        if (assigned.getSlaveId() != null) {
+          reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask()));
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
new file mode 100644
index 0000000..b86bd28
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.Subscribe;
+import com.twitter.common.stats.SlidingStats;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+
+/**
+ * A holding area for tasks that have been throttled.  Tasks entering the
+ * {@link org.apache.aurora.gen.ScheduleStatus#THROTTLED} state will be transitioned to
+ * {@link org.apache.aurora.gen.ScheduleStatus#PENDING} after the penalty period (as dictated by
+ * {@link RescheduleCalculator} has expired.
+ */
+class TaskThrottler implements EventSubscriber {
+
+  private final RescheduleCalculator rescheduleCalculator;
+  private final Clock clock;
+  private final ScheduledExecutorService executor;
+  private final Storage storage;
+  private final StateManager stateManager;
+
+  private final SlidingStats throttleStats = new SlidingStats("task_throttle", "ms");
+
+  @Inject
+  TaskThrottler(
+      RescheduleCalculator rescheduleCalculator,
+      Clock clock,
+      @AsyncExecutor ScheduledExecutorService executor,
+      Storage storage,
+      StateManager stateManager) {
+
+    this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
+    this.clock = requireNonNull(clock);
+    this.executor = requireNonNull(executor);
+    this.storage = requireNonNull(storage);
+    this.stateManager = requireNonNull(stateManager);
+  }
+
+  @Subscribe
+  public void taskChangedState(final TaskStateChange stateChange) {
+    if (stateChange.getNewState() == THROTTLED) {
+      long readyAtMs = Tasks.getLatestEvent(stateChange.getTask()).getTimestamp()
+          + rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask());
+      long delayMs = Math.max(0, readyAtMs - clock.nowMillis());
+      throttleStats.accumulate(delayMs);
+      executor.schedule(
+          new Runnable() {
+            @Override
+            public void run() {
+              storage.write(new Storage.MutateWork.NoResult.Quiet() {
+                @Override
+                protected void execute(Storage.MutableStoreProvider storeProvider) {
+                  stateManager.changeState(
+                      storeProvider,
+                      stateChange.getTaskId(),
+                      Optional.of(THROTTLED),
+                      PENDING,
+                      Optional.absent());
+                }
+              });
+            }
+          },
+          delayMs,
+          TimeUnit.MILLISECONDS);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index ff33d0b..08844d0 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -45,12 +45,12 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.TaskIdGenerator;
-import org.apache.aurora.scheduler.async.RescheduleCalculator;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
 import org.apache.aurora.scheduler.state.SideEffect.Action;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.TaskStore;

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
index 825e772..b96c84c 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -31,9 +31,9 @@ import com.twitter.common.quantity.Time;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource;
 import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResourceProvider;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
index 4ed6b15..5384307 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
@@ -29,13 +29,6 @@ import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.scheduler.AppStartup;
-import org.apache.aurora.scheduler.async.preemptor.Preemptor;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.MaintenanceController;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.junit.Before;
@@ -73,14 +66,7 @@ public class AsyncModuleTest extends EasyMockTest {
           protected void configure() {
             bind(StatsProvider.class).toInstance(statsProvider);
             bindMock(Clock.class);
-            bindMock(Driver.class);
-            bindMock(SchedulingFilter.class);
-            bindMock(MaintenanceController.class);
-            bindMock(Preemptor.class);
-            bindMock(StateManager.class);
-            bindMock(TaskAssigner.class);
             bindMock(Thread.UncaughtExceptionHandler.class);
-            bind(Storage.class).toInstance(storageUtil.storage);
           }
         });
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
deleted file mode 100644
index cb549a1..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.JobUpdateKey;
-import org.apache.aurora.scheduler.async.JobUpdateHistoryPruner.HistoryPrunerSettings;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.expect;
-
-public class JobUpdateHistoryPrunerTest extends EasyMockTest {
-  @Test
-  public void testExecution() throws Exception {
-    StorageTestUtil storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-
-    final ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
-    FakeScheduledExecutor executorClock =
-        FakeScheduledExecutor.scheduleAtFixedRateExecutor(executor, 2);
-
-    Clock mockClock = createMock(Clock.class);
-    expect(mockClock.nowMillis()).andReturn(2L).times(2);
-
-    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1))
-        .andReturn(ImmutableSet.of(
-            IJobUpdateKey.build(
-                new JobUpdateKey().setJob(new JobKey("role", "env", "job")).setId("id1"))));
-    expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)).andReturn(ImmutableSet.of());
-
-    control.replay();
-
-    executorClock.assertEmpty();
-    JobUpdateHistoryPruner pruner = new JobUpdateHistoryPruner(
-        mockClock,
-        executor,
-        storageUtil.storage,
-        new HistoryPrunerSettings(
-            Amount.of(1L, Time.MILLISECONDS),
-            Amount.of(1L, Time.MILLISECONDS),
-            1));
-
-    pruner.startAsync().awaitRunning();
-    executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
-    executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
deleted file mode 100644
index a295fe8..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.concurrent.ScheduledExecutorService;
-
-import javax.inject.Singleton;
-
-import com.google.common.eventbus.EventBus;
-import com.google.common.testing.TearDown;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.twitter.common.application.modules.LifecycleModule;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.BackoffStrategy;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.PubsubTestUtil;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.KILLING;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class KillRetryTest extends EasyMockTest {
-
-  private Driver driver;
-  private StorageTestUtil storageUtil;
-  private BackoffStrategy backoffStrategy;
-  private FakeScheduledExecutor clock;
-  private EventBus eventBus;
-  private FakeStatsProvider statsProvider;
-
-  @Before
-  public void setUp() throws Exception {
-    driver = createMock(Driver.class);
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-    backoffStrategy = createMock(BackoffStrategy.class);
-    final ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class);
-    clock = FakeScheduledExecutor.scheduleExecutor(executorMock);
-    addTearDown(new TearDown() {
-      @Override
-      public void tearDown() {
-        clock.assertEmpty();
-      }
-    });
-    statsProvider = new FakeStatsProvider();
-
-    Injector injector = Guice.createInjector(
-        new LifecycleModule(),
-        new PubsubEventModule(false),
-        new AbstractModule() {
-          @Override
-          protected void configure() {
-            bind(Driver.class).toInstance(driver);
-            bind(Storage.class).toInstance(storageUtil.storage);
-            bind(ScheduledExecutorService.class).toInstance(executorMock);
-            PubsubEventModule.bindSubscriber(binder(), KillRetry.class);
-            bind(KillRetry.class).in(Singleton.class);
-            bind(BackoffStrategy.class).toInstance(backoffStrategy);
-            bind(StatsProvider.class).toInstance(statsProvider);
-            bind(UncaughtExceptionHandler.class)
-                .toInstance(createMock(UncaughtExceptionHandler.class));
-          }
-        }
-    );
-    eventBus = injector.getInstance(EventBus.class);
-    PubsubTestUtil.startPubsub(injector);
-  }
-
-  private static IScheduledTask makeTask(String id, ScheduleStatus status) {
-    return IScheduledTask.build(new ScheduledTask()
-        .setStatus(status)
-        .setAssignedTask(new AssignedTask().setTaskId(id)));
-  }
-
-  private void moveToKilling(String taskId) {
-    eventBus.post(TaskStateChange.transition(makeTask(taskId, KILLING), RUNNING));
-  }
-
-  private static Query.Builder killingQuery(String taskId) {
-    return Query.taskScoped(taskId).byStatus(KILLING);
-  }
-
-  private void expectGetRetryDelay(long prevRetryMs, long retryInMs) {
-    expect(backoffStrategy.calculateBackoffMs(prevRetryMs)).andReturn(retryInMs);
-  }
-
-  private void expectRetry(String taskId, long prevRetryMs, long nextRetryMs) {
-    storageUtil.expectTaskFetch(killingQuery(taskId), makeTask(taskId, KILLING));
-    driver.killTask(taskId);
-    expectGetRetryDelay(prevRetryMs, nextRetryMs);
-  }
-
-  @Test
-  public void testRetries() {
-    String taskId = "a";
-    expectGetRetryDelay(0, 100);
-    expectRetry(taskId, 100, 1000);
-    expectRetry(taskId, 1000, 10000);
-
-    // Signal that task has transitioned.
-    storageUtil.expectTaskFetch(killingQuery(taskId));
-
-    control.replay();
-
-    moveToKilling(taskId);
-    clock.advance(Amount.of(100L, Time.MILLISECONDS));
-    clock.advance(Amount.of(1000L, Time.MILLISECONDS));
-    clock.advance(Amount.of(10000L, Time.MILLISECONDS));
-    assertEquals(2L, statsProvider.getLongValue(KillRetry.RETRIES_COUNTER));
-  }
-
-  @Test
-  public void testDoesNotRetry() {
-    String taskId = "a";
-    expectGetRetryDelay(0, 100);
-
-    storageUtil.expectTaskFetch(killingQuery(taskId));
-
-    control.replay();
-
-    moveToKilling(taskId);
-    clock.advance(Amount.of(100L, Time.MILLISECONDS));
-    assertEquals(0L, statsProvider.getLongValue(KillRetry.RETRIES_COUNTER));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java
deleted file mode 100644
index 874a124..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.logging.Level;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.testing.TearDown;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.OfferManager.OfferManagerImpl;
-import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
-import org.apache.mesos.Protos.TaskInfo;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
-import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class OfferManagerImplTest extends EasyMockTest {
-
-  private static final Amount<Long, Time> RETURN_DELAY = Amount.of(1L, Time.DAYS);
-  private static final String HOST_A = "HOST_A";
-  private static final HostOffer OFFER_A = new HostOffer(
-      Offers.makeOffer("OFFER_A", HOST_A),
-      IHostAttributes.build(new HostAttributes().setMode(NONE)));
-  private static final String HOST_B = "HOST_B";
-  private static final HostOffer OFFER_B = new HostOffer(
-      Offers.makeOffer("OFFER_B", HOST_B),
-      IHostAttributes.build(new HostAttributes().setMode(NONE)));
-  private static final String HOST_C = "HOST_C";
-  private static final HostOffer OFFER_C = new HostOffer(
-      Offers.makeOffer("OFFER_C", HOST_C),
-      IHostAttributes.build(new HostAttributes().setMode(NONE)));
-  private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(
-      ITaskConfig.build(new TaskConfig().setJob(new JobKey("role", "env", "name"))));
-
-  private Driver driver;
-  private FakeScheduledExecutor clock;
-  private Function<HostOffer, Assignment> offerAcceptor;
-  private OfferManagerImpl offerManager;
-
-  @Before
-  public void setUp() {
-    offerManager.LOG.setLevel(Level.FINE);
-    addTearDown(new TearDown() {
-      @Override
-      public void tearDown() throws Exception {
-        offerManager.LOG.setLevel(Level.INFO);
-      }
-    });
-    driver = createMock(Driver.class);
-    ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class);
-    clock = FakeScheduledExecutor.scheduleExecutor(executorMock);
-
-    addTearDown(new TearDown() {
-      @Override
-      public void tearDown() throws Exception {
-        clock.assertEmpty();
-      }
-    });
-    offerAcceptor = createMock(new Clazz<Function<HostOffer, Assignment>>() { });
-    OfferReturnDelay returnDelay = new OfferReturnDelay() {
-      @Override
-      public Amount<Long, Time> get() {
-        return RETURN_DELAY;
-      }
-    };
-    offerManager = new OfferManagerImpl(driver, returnDelay, executorMock);
-  }
-
-  @Test
-  public void testOffersSorted() throws Exception {
-    // Ensures that non-DRAINING offers are preferred - the DRAINING offer would be tried last.
-
-    HostOffer offerA = setMode(OFFER_A, DRAINING);
-    HostOffer offerC = setMode(OFFER_C, DRAINING);
-
-    TaskInfo task = TaskInfo.getDefaultInstance();
-    expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task));
-    driver.launchTask(OFFER_B.getOffer().getId(), task);
-
-    driver.declineOffer(offerA.getOffer().getId());
-    driver.declineOffer(offerC.getOffer().getId());
-
-    control.replay();
-
-    offerManager.addOffer(offerA);
-    offerManager.addOffer(OFFER_B);
-    offerManager.addOffer(offerC);
-    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-    clock.advance(RETURN_DELAY);
-  }
-
-  @Test
-  public void testGetOffersReturnsAllOffers() throws Exception {
-    expect(offerAcceptor.apply(OFFER_A))
-        .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))));
-
-    control.replay();
-
-    offerManager.addOffer(OFFER_A);
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
-
-    offerManager.cancelOffer(OFFER_A.getOffer().getId());
-    assertTrue(Iterables.isEmpty(offerManager.getOffers()));
-
-    clock.advance(RETURN_DELAY);
-  }
-
-  @Test
-  public void testOfferFilteringDueToStaticBan() throws Exception {
-    expect(offerAcceptor.apply(OFFER_A))
-        .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))));
-
-    TaskInfo task = TaskInfo.getDefaultInstance();
-    expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task));
-    driver.launchTask(OFFER_B.getOffer().getId(), task);
-
-    driver.declineOffer(OFFER_A.getOffer().getId());
-
-    control.replay();
-
-    offerManager.addOffer(OFFER_A);
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-    // Run again to make sure all offers are banned (via no expectations set).
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-
-    // Add a new offer to accept the task previously banned for OFFER_A.
-    offerManager.addOffer(OFFER_B);
-    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-
-    clock.advance(RETURN_DELAY);
-  }
-
-  @Test
-  public void testStaticBanIsCleared() throws Exception {
-    expect(offerAcceptor.apply(OFFER_A))
-        .andReturn(Assignment.failure(ImmutableSet.of(Veto.insufficientResources("ram", 100))));
-
-    TaskInfo task = TaskInfo.getDefaultInstance();
-    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task));
-    driver.launchTask(OFFER_A.getOffer().getId(), task);
-
-    expect(offerAcceptor.apply(OFFER_A))
-        .andReturn(Assignment.failure(ImmutableSet.of(Veto.maintenance("draining"))));
-
-    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task));
-    driver.launchTask(OFFER_A.getOffer().getId(), task);
-
-    driver.declineOffer(OFFER_A.getOffer().getId());
-
-    control.replay();
-
-    offerManager.addOffer(OFFER_A);
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-
-    // Make sure the static ban is cleared when the offers are returned.
-    clock.advance(RETURN_DELAY);
-    offerManager.addOffer(OFFER_A);
-    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-
-    offerManager.addOffer(OFFER_A);
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-
-    // Make sure the static ban is cleared when driver is disconnected.
-    offerManager.driverDisconnected(new DriverDisconnected());
-    offerManager.addOffer(OFFER_A);
-    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-
-    clock.advance(RETURN_DELAY);
-  }
-
-  @Test
-  public void testFlushOffers() throws Exception {
-    control.replay();
-
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(OFFER_B);
-    offerManager.driverDisconnected(new DriverDisconnected());
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-    clock.advance(RETURN_DELAY);
-  }
-
-  @Test
-  public void testDeclineOffer() throws Exception {
-    driver.declineOffer(OFFER_A.getOffer().getId());
-
-    control.replay();
-
-    offerManager.addOffer(OFFER_A);
-    clock.advance(RETURN_DELAY);
-  }
-
-  private static HostOffer setMode(HostOffer offer, MaintenanceMode mode) {
-    return new HostOffer(
-        offer.getOffer(),
-        IHostAttributes.build(offer.getAttributes().newBuilder().setMode(mode)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/Offers.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/Offers.java b/src/test/java/org/apache/aurora/scheduler/async/Offers.java
deleted file mode 100644
index 8293dd1..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/Offers.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.SlaveID;
-
-/**
- * Utility class for creating resource offers.
- */
-final class Offers {
-  private Offers() {
-    // Utility class.
-  }
-
-  static final String DEFAULT_HOST = "hostname";
-
-  static Offer makeOffer(String offerId) {
-    return Offers.makeOffer(offerId, DEFAULT_HOST);
-  }
-
-  static Offer makeOffer(String offerId, String hostName) {
-    return Offer.newBuilder()
-        .setId(OfferID.newBuilder().setValue(offerId))
-        .setFrameworkId(FrameworkID.newBuilder().setValue("framework_id"))
-        .setSlaveId(SlaveID.newBuilder().setValue("slave_id-" + offerId))
-        .setHostname(hostName)
-        .build();
-  }
-}


Mime
View raw message