aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [8/8] aurora git commit: Break apart async package and AsyncModule into purpose-specific equivalents.
Date Wed, 22 Jul 2015 19:40:05 GMT
Break apart async package and AsyncModule into purpose-specific equivalents.

Testing Done:
Confirmed end-to-end tests pass, and ./gradlew run works.

Reviewed at https://reviews.apache.org/r/36666/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/0070a5fd
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/0070a5fd
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/0070a5fd

Branch: refs/heads/master
Commit: 0070a5fd18c6f219a7fe66f327209b8dc21ab67e
Parents: 6e2bf57
Author: Bill Farner <wfarner@apache.org>
Authored: Wed Jul 22 12:39:37 2015 -0700
Committer: Bill Farner <wfarner@apache.org>
Committed: Wed Jul 22 12:39:37 2015 -0700

----------------------------------------------------------------------
 .../org/apache/aurora/benchmark/Offers.java     |   6 +-
 .../aurora/benchmark/SchedulingBenchmarks.java  |  16 +-
 .../aurora/benchmark/StatusUpdateBenchmark.java |   6 +-
 .../benchmark/fakes/FakeOfferManager.java       |   2 +-
 .../fakes/FakeRescheduleCalculator.java         |   2 +-
 .../apache/aurora/scheduler/ResourceSlot.java   |   2 +-
 .../apache/aurora/scheduler/app/AppModule.java  |  10 +-
 .../aurora/scheduler/async/AsyncModule.java     | 298 +-------
 .../scheduler/async/JobUpdateHistoryPruner.java | 105 ---
 .../aurora/scheduler/async/KillRetry.java       | 102 ---
 .../aurora/scheduler/async/OfferManager.java    | 406 -----------
 .../async/RandomJitterReturnDelay.java          |  49 --
 .../scheduler/async/RescheduleCalculator.java   | 174 -----
 .../aurora/scheduler/async/TaskGroup.java       |  77 ---
 .../aurora/scheduler/async/TaskGroups.java      | 239 -------
 .../scheduler/async/TaskHistoryPruner.java      | 174 -----
 .../aurora/scheduler/async/TaskReconciler.java  | 155 -----
 .../aurora/scheduler/async/TaskScheduler.java   | 247 -------
 .../aurora/scheduler/async/TaskThrottler.java   |  96 ---
 .../aurora/scheduler/async/TaskTimeout.java     | 157 -----
 .../scheduler/async/preemptor/BiCache.java      | 139 ----
 .../scheduler/async/preemptor/ClusterState.java |  34 -
 .../async/preemptor/ClusterStateImpl.java       |  50 --
 .../async/preemptor/PendingTaskProcessor.java   | 258 -------
 .../async/preemptor/PreemptionProposal.java     |  66 --
 .../async/preemptor/PreemptionVictim.java       | 115 ----
 .../async/preemptor/PreemptionVictimFilter.java | 214 ------
 .../scheduler/async/preemptor/Preemptor.java    | 121 ----
 .../async/preemptor/PreemptorMetrics.java       | 131 ----
 .../async/preemptor/PreemptorModule.java        | 167 -----
 .../apache/aurora/scheduler/http/Offers.java    |   2 +-
 .../aurora/scheduler/http/PendingTasks.java     |   2 +-
 .../scheduler/mesos/MesosSchedulerImpl.java     |   2 +-
 .../aurora/scheduler/offers/OfferManager.java   | 408 +++++++++++
 .../aurora/scheduler/offers/OffersModule.java   |  63 ++
 .../offers/RandomJitterReturnDelay.java         |  49 ++
 .../aurora/scheduler/preemptor/BiCache.java     | 139 ++++
 .../scheduler/preemptor/ClusterState.java       |  34 +
 .../scheduler/preemptor/ClusterStateImpl.java   |  50 ++
 .../preemptor/PendingTaskProcessor.java         | 258 +++++++
 .../scheduler/preemptor/PreemptionProposal.java |  66 ++
 .../scheduler/preemptor/PreemptionVictim.java   | 115 ++++
 .../preemptor/PreemptionVictimFilter.java       | 214 ++++++
 .../aurora/scheduler/preemptor/Preemptor.java   | 121 ++++
 .../scheduler/preemptor/PreemptorMetrics.java   | 131 ++++
 .../scheduler/preemptor/PreemptorModule.java    | 167 +++++
 .../pruning/JobUpdateHistoryPruner.java         | 105 +++
 .../aurora/scheduler/pruning/PruningModule.java | 106 +++
 .../scheduler/pruning/TaskHistoryPruner.java    | 175 +++++
 .../scheduler/reconciliation/KillRetry.java     | 103 +++
 .../reconciliation/ReconciliationModule.java    | 118 ++++
 .../reconciliation/TaskReconciler.java          | 156 +++++
 .../scheduler/reconciliation/TaskTimeout.java   | 158 +++++
 .../scheduling/RescheduleCalculator.java        | 174 +++++
 .../scheduler/scheduling/SchedulingModule.java  | 134 ++++
 .../aurora/scheduler/scheduling/TaskGroup.java  |  77 +++
 .../aurora/scheduler/scheduling/TaskGroups.java | 239 +++++++
 .../scheduler/scheduling/TaskScheduler.java     | 248 +++++++
 .../scheduler/scheduling/TaskThrottler.java     |  97 +++
 .../scheduler/state/StateManagerImpl.java       |   2 +-
 .../scheduler/stats/AsyncStatsModule.java       |   2 +-
 .../aurora/scheduler/async/AsyncModuleTest.java |  14 -
 .../async/JobUpdateHistoryPrunerTest.java       |  69 --
 .../aurora/scheduler/async/KillRetryTest.java   | 157 -----
 .../scheduler/async/OfferManagerImplTest.java   | 234 -------
 .../apache/aurora/scheduler/async/Offers.java   |  43 --
 .../async/RandomJitterReturnDelayTest.java      |  77 ---
 .../async/RescheduleCalculatorImplTest.java     | 188 ------
 .../aurora/scheduler/async/TaskGroupsTest.java  | 140 ----
 .../scheduler/async/TaskHistoryPrunerTest.java  | 398 -----------
 .../scheduler/async/TaskReconcilerTest.java     | 140 ----
 .../scheduler/async/TaskSchedulerImplTest.java  | 340 ----------
 .../scheduler/async/TaskSchedulerTest.java      | 669 ------------------
 .../scheduler/async/TaskThrottlerTest.java      | 146 ----
 .../aurora/scheduler/async/TaskTimeoutTest.java | 244 -------
 .../scheduler/async/preemptor/BiCacheTest.java  | 107 ---
 .../async/preemptor/ClusterStateImplTest.java   | 133 ----
 .../preemptor/PendingTaskProcessorTest.java     | 285 --------
 .../preemptor/PreemptionVictimFilterTest.java   | 512 --------------
 .../async/preemptor/PreemptionVictimTest.java   |  49 --
 .../async/preemptor/PreemptorImplTest.java      | 177 -----
 .../async/preemptor/PreemptorModuleTest.java    |  91 ---
 .../scheduler/http/JettyServerModuleTest.java   |   8 +-
 .../scheduler/mesos/MesosSchedulerImplTest.java |   2 +-
 .../scheduler/offers/OfferManagerImplTest.java  | 234 +++++++
 .../apache/aurora/scheduler/offers/Offers.java  |  43 ++
 .../offers/RandomJitterReturnDelayTest.java     |  77 +++
 .../aurora/scheduler/preemptor/BiCacheTest.java | 107 +++
 .../preemptor/ClusterStateImplTest.java         | 133 ++++
 .../preemptor/PendingTaskProcessorTest.java     | 285 ++++++++
 .../preemptor/PreemptionVictimFilterTest.java   | 512 ++++++++++++++
 .../preemptor/PreemptionVictimTest.java         |  49 ++
 .../scheduler/preemptor/PreemptorImplTest.java  | 177 +++++
 .../preemptor/PreemptorModuleTest.java          |  91 +++
 .../pruning/JobUpdateHistoryPrunerTest.java     |  69 ++
 .../pruning/TaskHistoryPrunerTest.java          | 398 +++++++++++
 .../scheduler/reconciliation/KillRetryTest.java | 159 +++++
 .../reconciliation/TaskReconcilerTest.java      | 140 ++++
 .../reconciliation/TaskTimeoutTest.java         | 244 +++++++
 .../RescheduleCalculatorImplTest.java           | 188 ++++++
 .../scheduler/scheduling/TaskGroupsTest.java    | 140 ++++
 .../scheduling/TaskSchedulerImplTest.java       | 342 ++++++++++
 .../scheduler/scheduling/TaskSchedulerTest.java | 671 +++++++++++++++++++
 .../scheduler/scheduling/TaskThrottlerTest.java | 146 ++++
 .../scheduler/state/StateManagerImplTest.java   |   2 +-
 .../aurora/scheduler/updater/JobUpdaterIT.java  |   4 +-
 106 files changed, 7953 insertions(+), 7814 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/jmh/java/org/apache/aurora/benchmark/Offers.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/Offers.java b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
index b8e6cb5..e40db74 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/Offers.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
@@ -20,8 +20,8 @@ import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.mesos.Protos;
 
@@ -34,9 +34,9 @@ final class Offers {
   }
 
   /**
-   * Saves offers into the {@link org.apache.aurora.scheduler.async.OfferManager}.
+   * Saves offers into the {@link OfferManager}.
    *
-   * @param offerManager {@link org.apache.aurora.scheduler.async.OfferManager} to save into.
+   * @param offerManager {@link OfferManager} to save into.
    * @param offers Offers to save.
    */
   static void addOffers(OfferManager offerManager, Iterable<HostOffer> offers) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index d9e5199..5716f23 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -38,20 +38,20 @@ import org.apache.aurora.benchmark.fakes.FakeRescheduleCalculator;
 import org.apache.aurora.benchmark.fakes.FakeStatsProvider;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TaskIdGenerator;
-import org.apache.aurora.scheduler.async.OfferManager;
-import org.apache.aurora.scheduler.async.RescheduleCalculator;
-import org.apache.aurora.scheduler.async.TaskScheduler;
-import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
-import org.apache.aurora.scheduler.async.preemptor.BiCache;
-import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl;
-import org.apache.aurora.scheduler.async.preemptor.PendingTaskProcessor;
-import org.apache.aurora.scheduler.async.preemptor.PreemptorModule;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.mesos.ExecutorSettings;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.preemptor.BiCache;
+import org.apache.aurora.scheduler.preemptor.ClusterStateImpl;
+import org.apache.aurora.scheduler.preemptor.PendingTaskProcessor;
+import org.apache.aurora.scheduler.preemptor.PreemptorModule;
+import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
+import org.apache.aurora.scheduler.scheduling.TaskScheduler;
+import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
 import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.db.DbUtil;

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
index e08d16e..3931d02 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
@@ -49,9 +49,6 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.TaskStatusHandler;
 import org.apache.aurora.scheduler.TaskStatusHandlerImpl;
-import org.apache.aurora.scheduler.async.OfferManager;
-import org.apache.aurora.scheduler.async.RescheduleCalculator;
-import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
@@ -62,6 +59,9 @@ import org.apache.aurora.scheduler.mesos.DriverFactory;
 import org.apache.aurora.scheduler.mesos.DriverSettings;
 import org.apache.aurora.scheduler.mesos.ExecutorSettings;
 import org.apache.aurora.scheduler.mesos.MesosSchedulerImpl;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.preemptor.ClusterStateImpl;
+import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
 import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.db.DbUtil;

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
index 45849b5..f413301 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
@@ -17,9 +17,9 @@ import com.google.common.base.Function;
 import com.google.common.base.Optional;
 
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.mesos.Protos;
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java
index 6d71012..4af2339 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java
@@ -13,7 +13,7 @@
  */
 package org.apache.aurora.benchmark.fakes;
 
-import org.apache.aurora.scheduler.async.RescheduleCalculator;
+import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
 public class FakeRescheduleCalculator implements RescheduleCalculator {

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
index 1a158b4..ecadb3e 100644
--- a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
+++ b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
@@ -24,9 +24,9 @@ import com.google.common.collect.Ordering;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 
-import org.apache.aurora.scheduler.async.preemptor.PreemptionVictim;
 import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.mesos.ExecutorSettings;
+import org.apache.aurora.scheduler.preemptor.PreemptionVictim;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos;
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index d2c1720..4cc1127 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -47,13 +47,17 @@ import org.apache.aurora.gen.ServerInfo;
 import org.apache.aurora.scheduler.SchedulerModule;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.async.AsyncModule;
-import org.apache.aurora.scheduler.async.preemptor.PreemptorModule;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.http.JettyServerModule;
 import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
 import org.apache.aurora.scheduler.metadata.MetadataModule;
+import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.preemptor.PreemptorModule;
+import org.apache.aurora.scheduler.pruning.PruningModule;
 import org.apache.aurora.scheduler.quota.QuotaModule;
+import org.apache.aurora.scheduler.reconciliation.ReconciliationModule;
+import org.apache.aurora.scheduler.scheduling.SchedulingModule;
 import org.apache.aurora.scheduler.sla.SlaModule;
 import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.stats.AsyncStatsModule;
@@ -115,6 +119,10 @@ public class AppModule extends AbstractModule {
     LifecycleModule.bindStartupAction(binder(), RegisterShutdownStackPrinter.class);
 
     install(new AsyncModule());
+    install(new OffersModule());
+    install(new PruningModule());
+    install(new ReconciliationModule());
+    install(new SchedulingModule());
     install(new AsyncStatsModule());
     install(new MetadataModule());
     install(new QuotaModule());

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 8c2d751..c345c92 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -15,45 +15,24 @@ package org.apache.aurora.scheduler.async;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 import javax.inject.Qualifier;
-import javax.inject.Singleton;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.RateLimiter;
 import com.google.inject.AbstractModule;
-import com.google.inject.PrivateModule;
-import com.google.inject.TypeLiteral;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.NotNegative;
-import com.twitter.common.args.constraints.Positive;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.Random;
-import com.twitter.common.util.TruncatedBinaryBackoff;
 
 import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.async.OfferManager.OfferManagerImpl;
-import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay;
-import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
-import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
-import org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSettings;
-import org.apache.aurora.scheduler.async.TaskReconciler.TaskReconcilerSettings;
-import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
-import org.apache.aurora.scheduler.async.preemptor.BiCache;
-import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings;
 import org.apache.aurora.scheduler.base.AsyncUtil;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -65,141 +44,15 @@ import static java.util.Objects.requireNonNull;
  * Binding module for async task management.
  */
 public class AsyncModule extends AbstractModule {
-
   private static final Logger LOG = Logger.getLogger(AsyncModule.class.getName());
 
   @CmdLine(name = "async_worker_threads",
       help = "The number of worker threads to process async task operations with.")
   private static final Arg<Integer> ASYNC_WORKER_THREADS = Arg.create(1);
 
-  @CmdLine(name = "transient_task_state_timeout",
-      help = "The amount of time after which to treat a task stuck in a transient state as LOST.")
-  private static final Arg<Amount<Long, Time>> TRANSIENT_TASK_STATE_TIMEOUT =
-      Arg.create(Amount.of(5L, Time.MINUTES));
-
-  @Positive
-  @CmdLine(name = "first_schedule_delay",
-      help = "Initial amount of time to wait before first attempting to schedule a PENDING task.")
-  private static final Arg<Amount<Long, Time>> FIRST_SCHEDULE_DELAY =
-      Arg.create(Amount.of(1L, Time.MILLISECONDS));
-
-  @Positive
-  @CmdLine(name = "initial_schedule_penalty",
-      help = "Initial amount of time to wait before attempting to schedule a task that has failed"
-          + " to schedule.")
-  private static final Arg<Amount<Long, Time>> INITIAL_SCHEDULE_PENALTY =
-      Arg.create(Amount.of(1L, Time.SECONDS));
-
-  @CmdLine(name = "max_schedule_penalty",
-      help = "Maximum delay between attempts to schedule a PENDING tasks.")
-  private static final Arg<Amount<Long, Time>> MAX_SCHEDULE_PENALTY =
-      Arg.create(Amount.of(1L, Time.MINUTES));
-
-  @CmdLine(name = "min_offer_hold_time",
-      help = "Minimum amount of time to hold a resource offer before declining.")
-  @NotNegative
-  private static final Arg<Amount<Integer, Time>> MIN_OFFER_HOLD_TIME =
-      Arg.create(Amount.of(5, Time.MINUTES));
-
-  @CmdLine(name = "offer_hold_jitter_window",
-      help = "Maximum amount of random jitter to add to the offer hold time window.")
-  @NotNegative
-  private static final Arg<Amount<Integer, Time>> OFFER_HOLD_JITTER_WINDOW =
-      Arg.create(Amount.of(1, Time.MINUTES));
-
-  @CmdLine(name = "history_prune_threshold",
-      help = "Time after which the scheduler will prune terminated task history.")
-  private static final Arg<Amount<Long, Time>> HISTORY_PRUNE_THRESHOLD =
-      Arg.create(Amount.of(2L, Time.DAYS));
-
-  @CmdLine(name = "history_max_per_job_threshold",
-      help = "Maximum number of terminated tasks to retain in a job history.")
-  private static final Arg<Integer> HISTORY_MAX_PER_JOB_THRESHOLD = Arg.create(100);
-
-  @CmdLine(name = "history_min_retention_threshold",
-      help = "Minimum guaranteed time for task history retention before any pruning is attempted.")
-  private static final Arg<Amount<Long, Time>> HISTORY_MIN_RETENTION_THRESHOLD =
-      Arg.create(Amount.of(1L, Time.HOURS));
-
-  @CmdLine(name = "max_schedule_attempts_per_sec",
-      help = "Maximum number of scheduling attempts to make per second.")
-  private static final Arg<Double> MAX_SCHEDULE_ATTEMPTS_PER_SEC = Arg.create(40D);
-
-  @CmdLine(name = "flapping_task_threshold",
-      help = "A task that repeatedly runs for less than this time is considered to be flapping.")
-  private static final Arg<Amount<Long, Time>> FLAPPING_THRESHOLD =
-      Arg.create(Amount.of(5L, Time.MINUTES));
-
-  @CmdLine(name = "initial_flapping_task_delay",
-      help = "Initial amount of time to wait before attempting to schedule a flapping task.")
-  private static final Arg<Amount<Long, Time>> INITIAL_FLAPPING_DELAY =
-      Arg.create(Amount.of(30L, Time.SECONDS));
-
-  @CmdLine(name = "max_flapping_task_delay",
-      help = "Maximum delay between attempts to schedule a flapping task.")
-  private static final Arg<Amount<Long, Time>> MAX_FLAPPING_DELAY =
-      Arg.create(Amount.of(5L, Time.MINUTES));
-
-  @CmdLine(name = "max_reschedule_task_delay_on_startup",
-      help = "Upper bound of random delay for pending task rescheduling on scheduler startup.")
-  private static final Arg<Amount<Integer, Time>> MAX_RESCHEDULING_DELAY =
-      Arg.create(Amount.of(30, Time.SECONDS));
-
-  @CmdLine(name = "job_update_history_per_job_threshold",
-      help = "Maximum number of completed job updates to retain in a job update history.")
-  private static final Arg<Integer> JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD = Arg.create(10);
-
-  @CmdLine(name = "job_update_history_pruning_interval",
-      help = "Job update history pruning interval.")
-  private static final Arg<Amount<Long, Time>> JOB_UPDATE_HISTORY_PRUNING_INTERVAL =
-      Arg.create(Amount.of(15L, Time.MINUTES));
-
-  @CmdLine(name = "job_update_history_pruning_threshold",
-      help = "Time after which the scheduler will prune completed job update history.")
-  private static final Arg<Amount<Long, Time>> JOB_UPDATE_HISTORY_PRUNING_THRESHOLD =
-      Arg.create(Amount.of(30L, Time.DAYS));
-
-  @CmdLine(name = "initial_task_kill_retry_interval",
-      help = "When killing a task, retry after this delay if mesos has not responded,"
-          + " backing off up to transient_task_state_timeout")
-  private static final Arg<Amount<Long, Time>> INITIAL_TASK_KILL_RETRY_INTERVAL =
-      Arg.create(Amount.of(5L, Time.SECONDS));
-
-  @CmdLine(name = "offer_reservation_duration", help = "Time to reserve a slave's offers while "
-      + "trying to satisfy a task preempting another.")
-  private static final Arg<Amount<Long, Time>> RESERVATION_DURATION =
-      Arg.create(Amount.of(3L, Time.MINUTES));
-
-  // Reconciliation may create a big surge of status updates in a large cluster. Setting the default
-  // initial delay to 1 minute to ease up storage contention during scheduler start up.
-  @CmdLine(name = "reconciliation_initial_delay",
-      help = "Initial amount of time to delay task reconciliation after scheduler start up.")
-  private static final Arg<Amount<Long, Time>> RECONCILIATION_INITIAL_DELAY =
-      Arg.create(Amount.of(1L, Time.MINUTES));
-
-  @Positive
-  @CmdLine(name = "reconciliation_explicit_interval",
-      help = "Interval on which scheduler will ask Mesos for status updates of all non-terminal "
-      + "tasks known to scheduler.")
-  private static final Arg<Amount<Long, Time>> RECONCILIATION_EXPLICIT_INTERVAL =
-      Arg.create(Amount.of(60L, Time.MINUTES));
-
-  @Positive
-  @CmdLine(name = "reconciliation_implicit_interval",
-      help = "Interval on which scheduler will ask Mesos for status updates of all non-terminal "
-          + "tasks known to Mesos.")
-  private static final Arg<Amount<Long, Time>> RECONCILIATION_IMPLICIT_INTERVAL =
-      Arg.create(Amount.of(60L, Time.MINUTES));
-
-  @CmdLine(name = "reconciliation_schedule_spread",
-      help = "Difference between explicit and implicit reconciliation intervals intended to "
-          + "create a non-overlapping task reconciliation schedule.")
-  private static final Arg<Amount<Long, Time>> RECONCILIATION_SCHEDULE_SPREAD =
-      Arg.create(Amount.of(30L, Time.MINUTES));
-
   @Qualifier
   @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  private @interface AsyncExecutor { }
+  public @interface AsyncExecutor { }
 
   @VisibleForTesting
   static final String TIMEOUT_QUEUE_GAUGE = "timeout_queue_size";
@@ -213,152 +66,9 @@ public class AsyncModule extends AbstractModule {
     final ScheduledThreadPoolExecutor executor =
         AsyncUtil.loggingScheduledExecutor(ASYNC_WORKER_THREADS.get(), "AsyncProcessor-%d", LOG);
     bind(ScheduledThreadPoolExecutor.class).annotatedWith(AsyncExecutor.class).toInstance(executor);
+    bind(ScheduledExecutorService.class).annotatedWith(AsyncExecutor.class).toInstance(executor);
+    bind(ExecutorService.class).annotatedWith(AsyncExecutor.class).toInstance(executor);
     SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterGauges.class);
-
-    // AsyncModule itself is not a subclass of PrivateModule because TaskEventModule internally uses
-    // a MultiBinder, which cannot span multiple injectors.
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        bind(new TypeLiteral<Amount<Long, Time>>() { })
-            .toInstance(TRANSIENT_TASK_STATE_TIMEOUT.get());
-        bind(ScheduledExecutorService.class).toInstance(executor);
-
-        bind(TaskTimeout.class).in(Singleton.class);
-        expose(TaskTimeout.class);
-      }
-    });
-    PubsubEventModule.bindSubscriber(binder(), TaskTimeout.class);
-    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskTimeout.class);
-
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        bind(TaskGroupsSettings.class).toInstance(new TaskGroupsSettings(
-            FIRST_SCHEDULE_DELAY.get(),
-            new TruncatedBinaryBackoff(
-                INITIAL_SCHEDULE_PENALTY.get(),
-                MAX_SCHEDULE_PENALTY.get()),
-            RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get())));
-
-        bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class)
-            .toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
-                new TruncatedBinaryBackoff(INITIAL_FLAPPING_DELAY.get(), MAX_FLAPPING_DELAY.get()),
-                FLAPPING_THRESHOLD.get(),
-                MAX_RESCHEDULING_DELAY.get()));
-
-        bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class);
-        expose(RescheduleCalculator.class);
-        bind(TaskGroups.class).in(Singleton.class);
-        expose(TaskGroups.class);
-      }
-    });
-    PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
-
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).in(Singleton.class);
-        bind(BiCacheSettings.class).toInstance(
-            new BiCacheSettings(RESERVATION_DURATION.get(), "reservation_cache_size"));
-        bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
-        bind(TaskSchedulerImpl.class).in(Singleton.class);
-        expose(TaskScheduler.class);
-      }
-    });
-    PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class);
-
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        bind(OfferReturnDelay.class).toInstance(
-            new RandomJitterReturnDelay(
-                MIN_OFFER_HOLD_TIME.get().as(Time.MILLISECONDS),
-                OFFER_HOLD_JITTER_WINDOW.get().as(Time.MILLISECONDS),
-                new Random.SystemRandom(new java.util.Random())));
-        bind(ScheduledExecutorService.class).toInstance(executor);
-        bind(OfferManager.class).to(OfferManagerImpl.class);
-        bind(OfferManagerImpl.class).in(Singleton.class);
-        expose(OfferManager.class);
-      }
-    });
-    PubsubEventModule.bindSubscriber(binder(), OfferManager.class);
-
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        // TODO(ksweeney): Create a configuration validator module so this can be injected.
-        // TODO(William Farner): Revert this once large task counts is cheap ala hierarchichal store
-        bind(HistoryPrunnerSettings.class).toInstance(new HistoryPrunnerSettings(
-            HISTORY_PRUNE_THRESHOLD.get(),
-            HISTORY_MIN_RETENTION_THRESHOLD.get(),
-            HISTORY_MAX_PER_JOB_THRESHOLD.get()
-        ));
-        bind(ScheduledExecutorService.class).toInstance(executor);
-
-        bind(TaskHistoryPruner.class).in(Singleton.class);
-        expose(TaskHistoryPruner.class);
-      }
-    });
-    PubsubEventModule.bindSubscriber(binder(), TaskHistoryPruner.class);
-
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        bind(ScheduledExecutorService.class).toInstance(executor);
-        bind(TaskThrottler.class).in(Singleton.class);
-        expose(TaskThrottler.class);
-      }
-    });
-    PubsubEventModule.bindSubscriber(binder(), TaskThrottler.class);
-
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        bind(TaskReconcilerSettings.class).toInstance(new TaskReconcilerSettings(
-            RECONCILIATION_INITIAL_DELAY.get(),
-            RECONCILIATION_EXPLICIT_INTERVAL.get(),
-            RECONCILIATION_IMPLICIT_INTERVAL.get(),
-            RECONCILIATION_SCHEDULE_SPREAD.get()));
-        bind(ScheduledExecutorService.class).toInstance(executor);
-        bind(TaskReconciler.class).in(Singleton.class);
-        expose(TaskReconciler.class);
-      }
-    });
-    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskReconciler.class);
-
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        bind(JobUpdateHistoryPruner.HistoryPrunerSettings.class).toInstance(
-            new JobUpdateHistoryPruner.HistoryPrunerSettings(
-                JOB_UPDATE_HISTORY_PRUNING_INTERVAL.get(),
-                JOB_UPDATE_HISTORY_PRUNING_THRESHOLD.get(),
-                JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD.get()));
-
-        bind(ScheduledExecutorService.class).toInstance(
-            AsyncUtil.singleThreadLoggingScheduledExecutor("JobUpdatePruner-%d", LOG));
-
-        bind(JobUpdateHistoryPruner.class).in(Singleton.class);
-        expose(JobUpdateHistoryPruner.class);
-      }
-    });
-    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
-        .to(JobUpdateHistoryPruner.class);
-
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        bind(ScheduledExecutorService.class).toInstance(executor);
-        bind(BackoffStrategy.class).toInstance(
-            new TruncatedBinaryBackoff(
-                INITIAL_TASK_KILL_RETRY_INTERVAL.get(),
-                TRANSIENT_TASK_STATE_TIMEOUT.get()));
-        bind(KillRetry.class).in(Singleton.class);
-        expose(KillRetry.class);
-      }
-    });
-    PubsubEventModule.bindSubscriber(binder(), KillRetry.class);
   }
 
   static class RegisterGauges extends AbstractIdleService {

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
deleted file mode 100644
index b416343..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Joiner;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Prunes per-job update history on a periodic basis.
- */
-class JobUpdateHistoryPruner extends AbstractIdleService {
-  private static final Logger LOG = Logger.getLogger(JobUpdateHistoryPruner.class.getName());
-
-  private final Clock clock;
-  private final ScheduledExecutorService executor;
-  private final Storage storage;
-  private final HistoryPrunerSettings settings;
-
-  static class HistoryPrunerSettings {
-    private final Amount<Long, Time> pruneInterval;
-    private final Amount<Long, Time> maxHistorySize;
-    private final int maxUpdatesPerJob;
-
-    HistoryPrunerSettings(
-        Amount<Long, Time> pruneInterval,
-        Amount<Long, Time> maxHistorySize,
-        int maxUpdatesPerJob) {
-
-      this.pruneInterval = requireNonNull(pruneInterval);
-      this.maxHistorySize = requireNonNull(maxHistorySize);
-      this.maxUpdatesPerJob = maxUpdatesPerJob;
-    }
-  }
-
-  @Inject
-  JobUpdateHistoryPruner(
-      Clock clock,
-      ScheduledExecutorService executor,
-      Storage storage,
-      HistoryPrunerSettings settings) {
-
-    this.clock = requireNonNull(clock);
-    this.executor = requireNonNull(executor);
-    this.storage = requireNonNull(storage);
-    this.settings = requireNonNull(settings);
-  }
-
-  @Override
-  protected void startUp() {
-    executor.scheduleAtFixedRate(
-        new Runnable() {
-          @Override
-          public void run() {
-            storage.write(new MutateWork.NoResult.Quiet() {
-              @Override
-              public void execute(MutableStoreProvider storeProvider) {
-                Set<IJobUpdateKey> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory(
-                    settings.maxUpdatesPerJob,
-                    clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS));
-
-                LOG.info(prunedUpdates.isEmpty()
-                    ? "No job update history to prune."
-                    : "Pruned job update history: " + Joiner.on(",").join(prunedUpdates));
-              }
-            });
-          }
-        },
-        settings.pruneInterval.as(Time.MILLISECONDS),
-        settings.pruneInterval.as(Time.MILLISECONDS),
-        TimeUnit.MILLISECONDS);
-  }
-
-  @Override
-  protected void shutDown() {
-    // Nothing to do - await VM shutdown.
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
deleted file mode 100644
index b125c1c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-import com.google.common.eventbus.Subscribe;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.BackoffStrategy;
-
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.storage.Storage;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Watches for task transitions into {@link ScheduleStatus#KILLING KILLING} and periodically
- * retries {@link Driver#killTask(String)} until the task transitions.
- */
-public class KillRetry implements EventSubscriber {
-  private static final Logger LOG = Logger.getLogger(KillRetry.class.getName());
-
-  @VisibleForTesting
-  static final String RETRIES_COUNTER = "task_kill_retries";
-
-  private final Driver driver;
-  private final Storage storage;
-  private final ScheduledExecutorService executor;
-  private final BackoffStrategy backoffStrategy;
-  private final AtomicLong killRetries;
-
-  @Inject
-  KillRetry(
-      Driver driver,
-      Storage storage,
-      ScheduledExecutorService executor,
-      BackoffStrategy backoffStrategy,
-      StatsProvider statsProvider) {
-
-    this.driver = requireNonNull(driver);
-    this.storage = requireNonNull(storage);
-    this.executor = requireNonNull(executor);
-    this.backoffStrategy = requireNonNull(backoffStrategy);
-    killRetries = statsProvider.makeCounter(RETRIES_COUNTER);
-  }
-
-  @Subscribe
-  public void taskChangedState(TaskStateChange stateChange) {
-    if (stateChange.getNewState() == ScheduleStatus.KILLING) {
-      new KillAttempt(stateChange.getTaskId()).tryLater();
-    }
-  }
-
-  private class KillAttempt implements Runnable {
-    private final String taskId;
-    private final AtomicLong retryInMs = new AtomicLong();
-
-    KillAttempt(String taskId) {
-      this.taskId = taskId;
-    }
-
-    void tryLater() {
-      retryInMs.set(backoffStrategy.calculateBackoffMs(retryInMs.get()));
-      executor.schedule(this, retryInMs.get(), TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public void run() {
-      Query.Builder query = Query.taskScoped(taskId).byStatus(ScheduleStatus.KILLING);
-      if (!Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) {
-        LOG.info("Task " + taskId + " not yet killed, retrying.");
-
-        // Kill did not yet take effect, try again.
-        driver.killTask(taskId);
-        killRetries.incrementAndGet();
-        tryLater();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
deleted file mode 100644
index e60d01e..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.Comparator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Ordering;
-import com.google.common.eventbus.Subscribe;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.SlaveID;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
-import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
-import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
-import static org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
-
-/**
- * Tracks the Offers currently known by the scheduler.
- */
-public interface OfferManager extends EventSubscriber {
-
-  /**
-   * Notifies the scheduler of a new resource offer.
-   *
-   * @param offer Newly-available resource offer.
-   */
-  void addOffer(HostOffer offer);
-
-  /**
-   * Invalidates an offer.  This indicates that the scheduler should not attempt to match any
-   * tasks against the offer.
-   *
-   * @param offer Canceled offer.
-   */
-  void cancelOffer(OfferID offer);
-
-  /**
-   * Launches the first task that satisfies the {@code acceptor} by returning a {@link Assignment}.
-   *
-   * @param acceptor Function that determines if an offer is accepted.
-   * @param groupKey Task group key.
-   * @return {@code true} if the task was launched, {@code false} if no offers satisfied the
-   *         {@code acceptor}.
-   * @throws LaunchException If the acceptor accepted an offer, but there was an error launching the
-   *                         task.
-   */
-  boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey)
-      throws LaunchException;
-
-  /**
-   * Notifies the offer queue that a host's attributes have changed.
-   *
-   * @param change State change notification.
-   */
-  void hostAttributesChanged(HostAttributesChanged change);
-
-  /**
-   * Gets the offers that the scheduler is holding.
-   *
-   * @return A snapshot of the offers that the scheduler is currently holding.
-   */
-  Iterable<HostOffer> getOffers();
-
-  /**
-   * Gets an offer for the given slave ID.
-   *
-   * @param slaveId Slave ID to get offer for.
-   * @return An offer for the slave ID.
-   */
-  Optional<HostOffer> getOffer(SlaveID slaveId);
-
-  /**
-   * Calculates the amount of time before an offer should be 'returned' by declining it.
-   * The delay is calculated for each offer that is received, so the return delay may be
-   * fixed or variable.
-   */
-  interface OfferReturnDelay extends Supplier<Amount<Long, Time>> {
-  }
-
-  /**
-   * Thrown when there was an unexpected failure trying to launch a task.
-   */
-  class LaunchException extends Exception {
-    LaunchException(String msg) {
-      super(msg);
-    }
-
-    LaunchException(String msg, Throwable cause) {
-      super(msg, cause);
-    }
-  }
-
-  class OfferManagerImpl implements OfferManager {
-    @VisibleForTesting
-    static final Logger LOG = Logger.getLogger(OfferManagerImpl.class.getName());
-
-    private final HostOffers hostOffers = new HostOffers();
-    private final AtomicLong offerRaces = Stats.exportLong("offer_accept_races");
-
-    private final Driver driver;
-    private final OfferReturnDelay returnDelay;
-    private final ScheduledExecutorService executor;
-
-    @Inject
-    OfferManagerImpl(
-        Driver driver,
-        OfferReturnDelay returnDelay,
-        ScheduledExecutorService executor) {
-
-      this.driver = requireNonNull(driver);
-      this.returnDelay = requireNonNull(returnDelay);
-      this.executor = requireNonNull(executor);
-    }
-
-    @Override
-    public void addOffer(final HostOffer offer) {
-      // We run a slight risk of a race here, which is acceptable.  The worst case is that we
-      // temporarily hold two offers for the same host, which should be corrected when we return
-      // them after the return delay.
-      // There's also a chance that we return an offer for compaction ~simultaneously with the
-      // same-host offer being canceled/returned.  This is also fine.
-      Optional<HostOffer> sameSlave = hostOffers.get(offer.getOffer().getSlaveId());
-      if (sameSlave.isPresent()) {
-        // If there are existing offers for the slave, decline all of them so the master can
-        // compact all of those offers into a single offer and send them back.
-        LOG.info("Returning offers for " + offer.getOffer().getSlaveId().getValue()
-            + " for compaction.");
-        decline(offer.getOffer().getId());
-        removeAndDecline(sameSlave.get().getOffer().getId());
-      } else {
-        hostOffers.add(offer);
-        executor.schedule(
-            new Runnable() {
-              @Override
-              public void run() {
-                removeAndDecline(offer.getOffer().getId());
-              }
-            },
-            returnDelay.get().as(Time.MILLISECONDS),
-            TimeUnit.MILLISECONDS);
-      }
-    }
-
-    void removeAndDecline(OfferID id) {
-      if (removeFromHostOffers(id)) {
-        decline(id);
-      }
-    }
-
-    void decline(OfferID id) {
-      LOG.fine("Declining offer " + id);
-      driver.declineOffer(id);
-    }
-
-    @Override
-    public void cancelOffer(final OfferID offerId) {
-      removeFromHostOffers(offerId);
-    }
-
-    private boolean removeFromHostOffers(final OfferID offerId) {
-      requireNonNull(offerId);
-
-      // The small risk of inconsistency is acceptable here - if we have an accept/remove race
-      // on an offer, the master will mark the task as LOST and it will be retried.
-      return hostOffers.remove(offerId);
-    }
-
-    @Override
-    public Iterable<HostOffer> getOffers() {
-      return hostOffers.getWeaklyConsistentOffers();
-    }
-
-    @Override
-    public Optional<HostOffer> getOffer(SlaveID slaveId) {
-      return hostOffers.get(slaveId);
-    }
-
-    /**
-     * Updates the preference of a host's offers.
-     *
-     * @param change Host change notification.
-     */
-    @Subscribe
-    public void hostAttributesChanged(HostAttributesChanged change) {
-      hostOffers.updateHostAttributes(change.getAttributes());
-    }
-
-    /**
-     * Notifies the queue that the driver is disconnected, and all the stored offers are now
-     * invalid.
-     * <p>
-     * The queue takes this as a signal to flush its queue.
-     *
-     * @param event Disconnected event.
-     */
-    @Subscribe
-    public void driverDisconnected(DriverDisconnected event) {
-      LOG.info("Clearing stale offers since the driver is disconnected.");
-      hostOffers.clear();
-    }
-
-    /**
-     * A container for the data structures used by this class, to make it easier to reason about
-     * the different indices used and their consistency.
-     */
-    private static class HostOffers {
-      private static final Comparator<HostOffer> PREFERENCE_COMPARATOR =
-          // Currently, the only preference is based on host maintenance status.
-          Ordering.explicit(NONE, SCHEDULED, DRAINING, DRAINED)
-              .onResultOf(new Function<HostOffer, MaintenanceMode>() {
-                @Override
-                public MaintenanceMode apply(HostOffer offer) {
-                  return offer.getAttributes().getMode();
-                }
-              })
-              .compound(Ordering.arbitrary());
-
-      private final Set<HostOffer> offers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR);
-      private final Map<OfferID, HostOffer> offersById = Maps.newHashMap();
-      private final Map<SlaveID, HostOffer> offersBySlave = Maps.newHashMap();
-      private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
-      // TODO(maxim): Expose via a debug endpoint. AURORA-1136.
-      // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
-      // scheduling attempts. See Assignment.Result for more details on static ban.
-      private final Multimap<OfferID, TaskGroupKey> staticallyBannedOffers = HashMultimap.create();
-
-      HostOffers() {
-        // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
-        // Could track this separately if it turns out to pose problems.
-        Stats.exportSize("outstanding_offers", offers);
-      }
-
-      synchronized Optional<HostOffer> get(SlaveID slaveId) {
-        return Optional.fromNullable(offersBySlave.get(slaveId));
-      }
-
-      synchronized void add(HostOffer offer) {
-        offers.add(offer);
-        offersById.put(offer.getOffer().getId(), offer);
-        offersBySlave.put(offer.getOffer().getSlaveId(), offer);
-        offersByHost.put(offer.getOffer().getHostname(), offer);
-      }
-
-      synchronized boolean remove(OfferID id) {
-        HostOffer removed = offersById.remove(id);
-        if (removed != null) {
-          offers.remove(removed);
-          offersBySlave.remove(removed.getOffer().getSlaveId());
-          offersByHost.remove(removed.getOffer().getHostname());
-          staticallyBannedOffers.removeAll(id);
-        }
-        return removed != null;
-      }
-
-      synchronized void updateHostAttributes(IHostAttributes attributes) {
-        HostOffer offer = offersByHost.remove(attributes.getHost());
-        if (offer != null) {
-          // Remove and re-add a host's offer to re-sort based on its new hostStatus
-          remove(offer.getOffer().getId());
-          add(new HostOffer(offer.getOffer(),  attributes));
-        }
-      }
-
-      synchronized Iterable<HostOffer> getWeaklyConsistentOffers() {
-        return Iterables.unmodifiableIterable(offers);
-      }
-
-      synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) {
-        boolean result = staticallyBannedOffers.containsEntry(offer.getOffer().getId(), groupKey);
-        if (LOG.isLoggable(Level.FINE)) {
-          LOG.fine(String.format(
-              "Host offer %s is statically banned for %s: %s",
-              offer,
-              groupKey,
-              result));
-        }
-        return result;
-      }
-
-      synchronized void addStaticGroupBan(HostOffer offer, TaskGroupKey groupKey) {
-        OfferID offerId = offer.getOffer().getId();
-        if (offersById.containsKey(offerId)) {
-          staticallyBannedOffers.put(offerId, groupKey);
-
-          if (LOG.isLoggable(Level.FINE)) {
-            LOG.fine(
-                String.format("Adding static ban for offer: %s, groupKey: %s", offer, groupKey));
-          }
-        }
-      }
-
-      synchronized void clear() {
-        offers.clear();
-        offersById.clear();
-        offersBySlave.clear();
-        offersByHost.clear();
-        staticallyBannedOffers.clear();
-      }
-    }
-
-    @Timed("offer_queue_launch_first")
-    @Override
-    public boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey)
-        throws LaunchException {
-
-      // It's important that this method is not called concurrently - doing so would open up the
-      // possibility of a race between the same offers being accepted by different threads.
-
-      for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) {
-        if (!hostOffers.isStaticallyBanned(offer, groupKey)
-            && acceptOffer(offer, acceptor, groupKey)) {
-          return true;
-        }
-      }
-
-      return false;
-    }
-
-    @Timed("offer_queue_accept_offer")
-    protected boolean acceptOffer(
-        HostOffer offer,
-        Function<HostOffer, Assignment> acceptor,
-        TaskGroupKey groupKey) throws LaunchException {
-
-      Assignment assignment = acceptor.apply(offer);
-      switch (assignment.getResult()) {
-
-        case SUCCESS:
-          // Guard against an offer being removed after we grabbed it from the iterator.
-          // If that happens, the offer will not exist in hostOffers, and we can immediately
-          // send it back to LOST for quick reschedule.
-          // Removing while iterating counts on the use of a weakly-consistent iterator being used,
-          // which is a feature of ConcurrentSkipListSet.
-          if (hostOffers.remove(offer.getOffer().getId())) {
-            try {
-              driver.launchTask(offer.getOffer().getId(), assignment.getTaskInfo().get());
-              return true;
-            } catch (IllegalStateException e) {
-              // TODO(William Farner): Catch only the checked exception produced by Driver
-              // once it changes from throwing IllegalStateException when the driver is not yet
-              // registered.
-              throw new LaunchException("Failed to launch task.", e);
-            }
-          } else {
-            offerRaces.incrementAndGet();
-            throw new LaunchException(
-                "Accepted offer no longer exists in offer queue, likely data race.");
-          }
-
-        case FAILURE_STATIC_MISMATCH:
-          // Exclude an offer that results in a static mismatch from further attempts to match
-          // against all tasks from the same group.
-          hostOffers.addStaticGroupBan(offer, groupKey);
-          return false;
-
-        default:
-          return false;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java b/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
deleted file mode 100644
index 6a8c967..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.Objects;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Random;
-
-import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * Returns offers after a random duration within a fixed window.
- */
-@VisibleForTesting
-class RandomJitterReturnDelay implements OfferReturnDelay {
-  private final int minHoldTimeMs;
-  private final int maxJitterWindowMs;
-  private final Random random;
-
-  RandomJitterReturnDelay(int minHoldTimeMs, int maxJitterWindowMs, Random random) {
-    checkArgument(minHoldTimeMs >= 0);
-    checkArgument(maxJitterWindowMs >= 0);
-
-    this.minHoldTimeMs = minHoldTimeMs;
-    this.maxJitterWindowMs = maxJitterWindowMs;
-    this.random = Objects.requireNonNull(random);
-  }
-
-  @Override
-  public Amount<Long, Time> get() {
-    return Amount.of((long) minHoldTimeMs + random.nextInt(maxJitterWindowMs), Time.MILLISECONDS);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
deleted file mode 100644
index 6a0c0a9..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.Random;
-
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.DRAINING;
-import static org.apache.aurora.gen.ScheduleStatus.KILLING;
-import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
-
-/**
- * Calculates scheduling delays for tasks.
- */
-public interface RescheduleCalculator {
-  /**
-   * Calculates the delay, in milliseconds, before the task should be considered eligible for
-   * (re)scheduling at scheduler startup.
-   *
-   * @param task Task to calculate delay for.
-   * @return Delay in msec.
-   */
-  long getStartupScheduleDelayMs(IScheduledTask task);
-
-  /**
-   * Calculates the penalty, in milliseconds, that a task should be penalized before being
-   * eligible for rescheduling.
-   *
-   * @param task Task to calculate delay for.
-   * @return Delay in msec.
-   */
-  long getFlappingPenaltyMs(IScheduledTask task);
-
-  class RescheduleCalculatorImpl implements RescheduleCalculator {
-
-    private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
-
-    private final Storage storage;
-    private final RescheduleCalculatorSettings settings;
-    // TODO(wfarner): Inject 'random' in the constructor for better test coverage.
-    private final Random random = new Random.SystemRandom(new java.util.Random());
-
-    private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =
-        Predicates.in(Tasks.ACTIVE_STATES);
-
-    private static final Set<ScheduleStatus> INTERRUPTED_TASK_STATES =
-        EnumSet.of(RESTARTING, KILLING, DRAINING);
-
-    private final Predicate<IScheduledTask> flapped = new Predicate<IScheduledTask>() {
-      @Override
-      public boolean apply(IScheduledTask task) {
-        if (!task.isSetTaskEvents()) {
-          return false;
-        }
-
-        List<ITaskEvent> events = Lists.reverse(task.getTaskEvents());
-
-        // Avoid penalizing tasks that were interrupted by outside action, such as a user
-        // restarting them.
-        if (Iterables.any(Iterables.transform(events, Tasks.TASK_EVENT_TO_STATUS),
-            Predicates.in(INTERRUPTED_TASK_STATES))) {
-          return false;
-        }
-
-        ITaskEvent terminalEvent = Iterables.get(events, 0);
-        ScheduleStatus terminalState = terminalEvent.getStatus();
-        Preconditions.checkState(Tasks.isTerminated(terminalState));
-
-        ITaskEvent activeEvent = Iterables.find(
-            events,
-            Predicates.compose(IS_ACTIVE_STATUS, Tasks.TASK_EVENT_TO_STATUS));
-
-        long thresholdMs = settings.flappingTaskThreashold.as(Time.MILLISECONDS);
-
-        return (terminalEvent.getTimestamp() - activeEvent.getTimestamp()) < thresholdMs;
-      }
-    };
-
-    @VisibleForTesting
-    public static class RescheduleCalculatorSettings {
-      private final BackoffStrategy flappingTaskBackoff;
-      private final Amount<Long, Time> flappingTaskThreashold;
-      private final Amount<Integer, Time>  maxStartupRescheduleDelay;
-
-      public RescheduleCalculatorSettings(
-          BackoffStrategy flappingTaskBackoff,
-          Amount<Long, Time> flappingTaskThreashold,
-          Amount<Integer, Time> maxStartupRescheduleDelay) {
-
-        this.flappingTaskBackoff = requireNonNull(flappingTaskBackoff);
-        this.flappingTaskThreashold = requireNonNull(flappingTaskThreashold);
-        this.maxStartupRescheduleDelay = requireNonNull(maxStartupRescheduleDelay);
-      }
-    }
-
-    @Inject
-    RescheduleCalculatorImpl(Storage storage, RescheduleCalculatorSettings settings) {
-      this.storage = requireNonNull(storage);
-      this.settings = requireNonNull(settings);
-    }
-
-    @Override
-    public long getStartupScheduleDelayMs(IScheduledTask task) {
-      return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS).intValue())
-          + getFlappingPenaltyMs(task);
-    }
-
-    private Optional<IScheduledTask> getTaskAncestor(IScheduledTask task) {
-      if (!task.isSetAncestorId()) {
-        return Optional.absent();
-      }
-
-      Iterable<IScheduledTask> res =
-          Storage.Util.fetchTasks(storage, Query.taskScoped(task.getAncestorId()));
-
-      return Optional.fromNullable(Iterables.getOnlyElement(res, null));
-    }
-
-    @Override
-    public long getFlappingPenaltyMs(IScheduledTask task) {
-      Optional<IScheduledTask> curTask = getTaskAncestor(task);
-      long penaltyMs = 0;
-      while (curTask.isPresent() && flapped.apply(curTask.get())) {
-        LOG.info(
-            String.format("Ancestor of %s flapped: %s", Tasks.id(task), Tasks.id(curTask.get())));
-        long newPenalty = settings.flappingTaskBackoff.calculateBackoffMs(penaltyMs);
-        // If the backoff strategy is truncated then there is no need for us to continue.
-        if (newPenalty == penaltyMs) {
-          break;
-        }
-        penaltyMs = newPenalty;
-        curTask = getTaskAncestor(curTask.get());
-      }
-
-      return penaltyMs;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
deleted file mode 100644
index 635419b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.Queue;
-import java.util.Set;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-
-/**
- * A group of task IDs that are eligible for scheduling, but may be waiting for a backoff to expire.
- */
-class TaskGroup {
-  private final TaskGroupKey key;
-  private long penaltyMs;
-  private final Queue<String> tasks;
-
-  TaskGroup(TaskGroupKey key, String initialTaskId) {
-    this.key = key;
-    this.penaltyMs = 0;
-    this.tasks = Lists.newLinkedList();
-    this.tasks.add(initialTaskId);
-  }
-
-  synchronized TaskGroupKey getKey() {
-    return key;
-  }
-
-  synchronized Optional<String> peek() {
-    return Optional.fromNullable(tasks.peek());
-  }
-
-  synchronized boolean hasMore() {
-    return !tasks.isEmpty();
-  }
-
-  synchronized void remove(String taskId) {
-    tasks.remove(taskId);
-  }
-
-  synchronized void offer(String taskId) {
-    tasks.offer(taskId);
-  }
-
-  synchronized void setPenaltyMs(long penaltyMs) {
-    this.penaltyMs = penaltyMs;
-  }
-
-  // Begin methods used for debug interfaces.
-
-  public synchronized String getName() {
-    return key.toString();
-  }
-
-  public synchronized Set<String> getTaskIds() {
-    return ImmutableSet.copyOf(tasks);
-  }
-
-  public synchronized long getPenaltyMs() {
-    return penaltyMs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
deleted file mode 100644
index 1580404..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.RateLimiter;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.Command;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.SlidingStats;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
-
-import org.apache.aurora.scheduler.base.AsyncUtil;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-
-import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-
-/**
- * A collection of task groups, where a task group is a collection of tasks that are known to be
- * equal in the way they schedule. This is expected to be tasks associated with the same job key,
- * who also have {@code equal()} {@link org.apache.aurora.scheduler.storage.entities.ITaskConfig}
- * values.
- * <p>
- * This is used to prevent redundant work in trying to schedule tasks as well as to provide
- * nearly-equal responsiveness when scheduling across jobs.  In other words, a 1000 instance job
- * cannot starve a 1 instance job.
- */
-public class TaskGroups implements EventSubscriber {
-
-  private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
-
-  private final ConcurrentMap<TaskGroupKey, TaskGroup> groups = Maps.newConcurrentMap();
-  private final ScheduledExecutorService executor;
-  private final TaskScheduler taskScheduler;
-  private final long firstScheduleDelay;
-  private final BackoffStrategy backoff;
-  private final RescheduleCalculator rescheduleCalculator;
-
-  // Track the penalties of tasks at the time they were scheduled. This is to provide data that
-  // may influence the selection of a different backoff strategy.
-  private final SlidingStats scheduledTaskPenalties =
-      new SlidingStats("scheduled_task_penalty", "ms");
-
-  public static class TaskGroupsSettings {
-    private final Amount<Long, Time> firstScheduleDelay;
-    private final BackoffStrategy taskGroupBackoff;
-    private final RateLimiter rateLimiter;
-
-    public TaskGroupsSettings(
-        Amount<Long, Time> firstScheduleDelay,
-        BackoffStrategy taskGroupBackoff,
-        RateLimiter rateLimiter) {
-
-      this.firstScheduleDelay = requireNonNull(firstScheduleDelay);
-      this.taskGroupBackoff = requireNonNull(taskGroupBackoff);
-      this.rateLimiter = requireNonNull(rateLimiter);
-    }
-  }
-
-  @Inject
-  TaskGroups(
-      ShutdownRegistry shutdownRegistry,
-      TaskGroupsSettings settings,
-      TaskScheduler taskScheduler,
-      RescheduleCalculator rescheduleCalculator) {
-
-    this(
-        createThreadPool(shutdownRegistry),
-        settings.firstScheduleDelay,
-        settings.taskGroupBackoff,
-        settings.rateLimiter,
-        taskScheduler,
-        rescheduleCalculator);
-  }
-
-  @VisibleForTesting
-  TaskGroups(
-      final ScheduledExecutorService executor,
-      final Amount<Long, Time> firstScheduleDelay,
-      final BackoffStrategy backoff,
-      final RateLimiter rateLimiter,
-      final TaskScheduler taskScheduler,
-      final RescheduleCalculator rescheduleCalculator) {
-
-    requireNonNull(firstScheduleDelay);
-    Preconditions.checkArgument(firstScheduleDelay.getValue() > 0);
-
-    this.executor = requireNonNull(executor);
-    requireNonNull(rateLimiter);
-    requireNonNull(taskScheduler);
-    this.firstScheduleDelay = firstScheduleDelay.as(Time.MILLISECONDS);
-    this.backoff = requireNonNull(backoff);
-    this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
-
-    this.taskScheduler = new TaskScheduler() {
-      @Override
-      public boolean schedule(String taskId) {
-        rateLimiter.acquire();
-        return taskScheduler.schedule(taskId);
-      }
-    };
-  }
-
-  private synchronized void evaluateGroupLater(Runnable evaluate, TaskGroup group) {
-    // Avoid check-then-act by holding the intrinsic lock.  If not done atomically, we could
-    // remove a group while a task is being added to it.
-    if (group.hasMore()) {
-      executor.schedule(evaluate, group.getPenaltyMs(), MILLISECONDS);
-    } else {
-      groups.remove(group.getKey());
-    }
-  }
-
-  private void startGroup(final TaskGroup group) {
-    Runnable monitor = new Runnable() {
-      @Override
-      public void run() {
-        Optional<String> taskId = group.peek();
-        long penaltyMs = 0;
-        if (taskId.isPresent()) {
-          if (taskScheduler.schedule(taskId.get())) {
-            scheduledTaskPenalties.accumulate(group.getPenaltyMs());
-            group.remove(taskId.get());
-            if (group.hasMore()) {
-              penaltyMs = firstScheduleDelay;
-            }
-          } else {
-            penaltyMs = backoff.calculateBackoffMs(group.getPenaltyMs());
-          }
-        }
-
-        group.setPenaltyMs(penaltyMs);
-        evaluateGroupLater(this, group);
-      }
-    };
-    evaluateGroupLater(monitor, group);
-  }
-
-  private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) {
-    final ScheduledThreadPoolExecutor executor =
-        AsyncUtil.singleThreadLoggingScheduledExecutor("TaskScheduler-%d", LOG);
-
-    Stats.exportSize("schedule_queue_size", executor.getQueue());
-    shutdownRegistry.addAction(new Command() {
-      @Override
-      public void execute() {
-        new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
-      }
-    });
-    return executor;
-  }
-
-  /**
-   * Informs the task groups of a task state change.
-   * <p>
-   * This is used to observe {@link org.apache.aurora.gen.ScheduleStatus#PENDING} tasks and begin
-   * attempting to schedule them.
-   *
-   * @param stateChange State change notification.
-   */
-  @Subscribe
-  public synchronized void taskChangedState(TaskStateChange stateChange) {
-    if (stateChange.getNewState() == PENDING) {
-      IScheduledTask task = stateChange.getTask();
-      TaskGroupKey key = TaskGroupKey.from(task.getAssignedTask().getTask());
-      TaskGroup newGroup = new TaskGroup(key, Tasks.id(task));
-      TaskGroup existing = groups.putIfAbsent(key, newGroup);
-      if (existing == null) {
-        long penaltyMs;
-        if (stateChange.isTransition()) {
-          penaltyMs = firstScheduleDelay;
-        } else {
-          penaltyMs = rescheduleCalculator.getStartupScheduleDelayMs(task);
-        }
-        newGroup.setPenaltyMs(penaltyMs);
-        startGroup(newGroup);
-      } else {
-        existing.offer(Tasks.id(task));
-      }
-    }
-  }
-
-  /**
-   * Signals the scheduler that tasks have been deleted.
-   *
-   * @param deleted Tasks deleted event.
-   */
-  @Subscribe
-  public synchronized void tasksDeleted(TasksDeleted deleted) {
-    for (IAssignedTask task
-        : Iterables.transform(deleted.getTasks(), Tasks.SCHEDULED_TO_ASSIGNED)) {
-      TaskGroup group = groups.get(TaskGroupKey.from(task.getTask()));
-      if (group != null) {
-        group.remove(task.getTaskId());
-      }
-    }
-  }
-
-  public Iterable<TaskGroup> getGroups() {
-    return ImmutableSet.copyOf(groups.values());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
deleted file mode 100644
index 7b6c063..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.eventbus.Subscribe;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.gen.apiConstants;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-
-/**
- * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
- * transitioning into one of the inactive states.
- */
-public class TaskHistoryPruner implements EventSubscriber {
-  private static final Logger LOG = Logger.getLogger(TaskHistoryPruner.class.getName());
-
-  private final ScheduledExecutorService executor;
-  private final StateManager stateManager;
-  private final Clock clock;
-  private final HistoryPrunnerSettings settings;
-  private final Storage storage;
-
-  private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() {
-    @Override
-    public boolean apply(IScheduledTask task) {
-      return Tasks.getLatestEvent(task).getTimestamp()
-          <= clock.nowMillis() - settings.minRetentionThresholdMillis;
-    }
-  };
-
-  static class HistoryPrunnerSettings {
-    private final long pruneThresholdMillis;
-    private final long minRetentionThresholdMillis;
-    private final int perJobHistoryGoal;
-
-    HistoryPrunnerSettings(
-        Amount<Long, Time> inactivePruneThreshold,
-        Amount<Long, Time> minRetentionThreshold,
-        int perJobHistoryGoal) {
-
-      this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
-      this.minRetentionThresholdMillis = minRetentionThreshold.as(Time.MILLISECONDS);
-      this.perJobHistoryGoal = perJobHistoryGoal;
-    }
-  }
-
-  @Inject
-  TaskHistoryPruner(
-      final ScheduledExecutorService executor,
-      final StateManager stateManager,
-      final Clock clock,
-      final HistoryPrunnerSettings settings,
-      final Storage storage) {
-
-    this.executor = requireNonNull(executor);
-    this.stateManager = requireNonNull(stateManager);
-    this.clock = requireNonNull(clock);
-    this.settings = requireNonNull(settings);
-    this.storage = requireNonNull(storage);
-  }
-
-  @VisibleForTesting
-  long calculateTimeout(long taskEventTimestampMillis) {
-    return Math.max(
-        settings.minRetentionThresholdMillis,
-        settings.pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis));
-  }
-
-  /**
-   * When triggered, records an inactive task state change.
-   *
-   * @param change Event when a task changes state.
-   */
-  @Subscribe
-  public void recordStateChange(TaskStateChange change) {
-    if (Tasks.isTerminated(change.getNewState())) {
-      long timeoutBasis = change.isTransition()
-          ? clock.nowMillis()
-          : Iterables.getLast(change.getTask().getTaskEvents()).getTimestamp();
-      registerInactiveTask(
-          Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()),
-          change.getTaskId(),
-          calculateTimeout(timeoutBasis));
-    }
-  }
-
-  private void deleteTasks(final Set<String> taskIds) {
-    LOG.info("Pruning inactive tasks " + taskIds);
-    storage.write(new Storage.MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(Storage.MutableStoreProvider storeProvider) {
-        stateManager.deleteTasks(storeProvider, taskIds);
-      }
-    });
-  }
-
-  @VisibleForTesting
-  static Query.Builder jobHistoryQuery(IJobKey jobKey) {
-    return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES);
-  }
-
-  private void registerInactiveTask(
-      final IJobKey jobKey,
-      final String taskId,
-      long timeRemaining) {
-
-    LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
-    executor.schedule(
-        new Runnable() {
-          @Override
-          public void run() {
-            LOG.info("Pruning expired inactive task " + taskId);
-            deleteTasks(ImmutableSet.of(taskId));
-          }
-        },
-        timeRemaining,
-        TimeUnit.MILLISECONDS);
-
-    executor.submit(new Runnable() {
-      @Override
-      public void run() {
-        Iterable<IScheduledTask> inactiveTasks =
-            Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
-        int numInactiveTasks = Iterables.size(inactiveTasks);
-        int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal;
-        if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) {
-          Set<String> toPrune = FluentIterable
-              .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
-              .filter(safeToDelete)
-              .limit(tasksToPrune)
-              .transform(Tasks.SCHEDULED_TO_ID)
-              .toSet();
-          deleteTasks(toPrune);
-        }
-      }
-    });
-  }
-}


Mime
View raw message