Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1D782189F5 for ; Wed, 5 Aug 2015 19:40:14 +0000 (UTC) Received: (qmail 14211 invoked by uid 500); 5 Aug 2015 19:40:14 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 14173 invoked by uid 500); 5 Aug 2015 19:40:13 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 14164 invoked by uid 99); 5 Aug 2015 19:40:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Aug 2015 19:40:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B5B18DFD9C; Wed, 5 Aug 2015 19:40:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: aurora git commit: Integrating DelayExecutor into the scheduler's transaction handling. Date: Wed, 5 Aug 2015 19:40:13 +0000 (UTC) Repository: aurora Updated Branches: refs/heads/master ae6e8575b -> 61c63ea9e Integrating DelayExecutor into the scheduler's transaction handling. Bugs closed: AURORA-1395 Reviewed at https://reviews.apache.org/r/37049/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/61c63ea9 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/61c63ea9 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/61c63ea9 Branch: refs/heads/master Commit: 61c63ea9e79675bd415fbae0cd5d16f51ebd63f2 Parents: ae6e857 Author: Bill Farner Authored: Wed Aug 5 12:39:52 2015 -0700 Committer: Bill Farner Committed: Wed Aug 5 12:39:52 2015 -0700 ---------------------------------------------------------------------- .../aurora/benchmark/SchedulingBenchmarks.java | 40 ++++------- .../aurora/benchmark/ThriftApiBenchmarks.java | 2 + .../aurora/scheduler/async/AsyncModule.java | 76 ++++++++++++-------- .../scheduler/async/GatedDelayExecutor.java | 7 ++ .../aurora/scheduler/offers/OfferManager.java | 12 ++-- .../scheduler/pruning/TaskHistoryPruner.java | 14 ++-- .../scheduler/reconciliation/KillRetry.java | 11 +-- .../reconciliation/ReconciliationModule.java | 20 ++++++ .../reconciliation/TaskReconciler.java | 4 +- .../scheduler/reconciliation/TaskTimeout.java | 18 +++-- .../aurora/scheduler/scheduling/TaskGroups.java | 64 +++-------------- .../scheduler/scheduling/TaskThrottler.java | 15 ++-- .../aurora/scheduler/storage/db/DbModule.java | 29 +++++--- .../aurora/scheduler/storage/db/DbStorage.java | 23 +++++- .../aurora/scheduler/storage/db/DbUtil.java | 2 + .../aurora/scheduler/async/AsyncModuleTest.java | 6 +- .../scheduler/http/JettyServerModuleTest.java | 5 ++ .../http/api/security/HttpSecurityIT.java | 7 -- .../scheduler/offers/OfferManagerImplTest.java | 6 +- .../pruning/TaskHistoryPrunerTest.java | 62 +++++++++------- .../scheduler/reconciliation/KillRetryTest.java | 17 ++--- .../reconciliation/TaskTimeoutTest.java | 16 ++--- .../scheduler/scheduling/TaskGroupsTest.java | 12 ++-- .../scheduler/scheduling/TaskThrottlerTest.java | 14 ++-- .../scheduler/storage/db/DbStorageTest.java | 6 ++ .../testing/FakeScheduledExecutor.java | 35 ++++++++- 26 files changed, 286 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java index d75f090..e41b299 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java @@ -14,14 +14,11 @@ package org.apache.aurora.benchmark; import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.inject.Singleton; import com.google.common.eventbus.EventBus; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; @@ -39,6 +36,7 @@ import org.apache.aurora.benchmark.fakes.FakeStatsProvider; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.TaskIdGenerator; import org.apache.aurora.scheduler.async.AsyncModule; +import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.filter.SchedulingFilter; @@ -90,7 +88,6 @@ public class SchedulingBenchmarks { private static final Amount DELAY_FOREVER = Amount.of(30L, Time.DAYS); protected Storage storage; protected PendingTaskProcessor pendingTaskProcessor; - protected ScheduledThreadPoolExecutor executor; private TaskScheduler taskScheduler; private OfferManager offerManager; private EventBus eventBus; @@ -105,11 +102,6 @@ public class SchedulingBenchmarks { eventBus = new EventBus(); final FakeClock clock = new FakeClock(); clock.setNowMillis(System.currentTimeMillis()); - executor = new ScheduledThreadPoolExecutor( - 1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("TestProcessor-%d").build()); // TODO(maxim): Find a way to DRY it and reuse existing modules instead. Injector injector = Guice.createInjector( @@ -118,18 +110,23 @@ public class SchedulingBenchmarks { new PrivateModule() { @Override protected void configure() { - bind(ScheduledExecutorService.class) - .annotatedWith(AsyncModule.AsyncExecutor.class) - .toInstance(executor); - bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class); - bind(OfferManager.OfferManagerImpl.class).in(Singleton.class); - bind(OfferManager.OfferReturnDelay.class).toInstance( - new OfferManager.OfferReturnDelay() { + // We use a no-op executor for async work, as this benchmark is focused on the + // synchronous scheduling operations. + bind(DelayExecutor.class).annotatedWith(AsyncModule.AsyncExecutor.class) + .toInstance(new DelayExecutor() { + @Override + public void execute(Runnable work, Amount minDelay) { + // No-op. + } + @Override - public Amount get() { - return DELAY_FOREVER; + public void execute(Runnable command) { + // No-op. } }); + bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class); + bind(OfferManager.OfferManagerImpl.class).in(Singleton.class); + bind(OfferManager.OfferReturnDelay.class).toInstance(() -> DELAY_FOREVER); bind(BiCache.BiCacheSettings.class).toInstance( new BiCache.BiCacheSettings(DELAY_FOREVER, "")); bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class); @@ -183,13 +180,6 @@ public class SchedulingBenchmarks { saveTasks(settings.getTasks()); } - @Setup(Level.Iteration) - public void setUpIteration() { - // Clear executor queue between iterations. Otherwise, executor tasks keep piling up and - // affect benchmark performance due to memory pressure and excessive GC. - executor.getQueue().clear(); - } - private Set buildClusterTasks(int numOffers) { int numOffersToFill = (int) Math.round(numOffers * settings.getClusterUtilization()); return new Tasks.Builder() http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java index b2a3e9b..4ddfea2 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java @@ -32,6 +32,7 @@ import org.apache.aurora.gen.ReadOnlyScheduler; import org.apache.aurora.gen.Response; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.TaskQuery; +import org.apache.aurora.scheduler.async.AsyncModule; import org.apache.aurora.scheduler.cron.CronPredictor; import org.apache.aurora.scheduler.quota.QuotaManager; import org.apache.aurora.scheduler.state.LockManager; @@ -151,6 +152,7 @@ public class ThriftApiBenchmarks { bind(StatsProvider.class).toInstance(new FakeStatsProvider()); } }, + new AsyncModule(), DbModule.productionModule(Bindings.KeyFactory.PLAIN), new ThriftModule.ReadOnly()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java index c345c92..8416ea0 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java @@ -15,18 +15,18 @@ package org.apache.aurora.scheduler.async; import java.lang.annotation.Retention; import java.lang.annotation.Target; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.logging.Logger; import javax.inject.Inject; import javax.inject.Qualifier; +import javax.inject.Singleton; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Supplier; import com.google.common.util.concurrent.AbstractIdleService; import com.google.inject.AbstractModule; +import com.google.inject.PrivateModule; import com.twitter.common.args.Arg; import com.twitter.common.args.CmdLine; import com.twitter.common.stats.StatsProvider; @@ -48,61 +48,77 @@ public class AsyncModule extends AbstractModule { @CmdLine(name = "async_worker_threads", help = "The number of worker threads to process async task operations with.") - private static final Arg ASYNC_WORKER_THREADS = Arg.create(1); + private static final Arg ASYNC_WORKER_THREADS = Arg.create(8); + private final ScheduledThreadPoolExecutor afterTransaction; @Qualifier @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) public @interface AsyncExecutor { } - @VisibleForTesting - static final String TIMEOUT_QUEUE_GAUGE = "timeout_queue_size"; + public AsyncModule() { + // Don't worry about clean shutdown, these can be daemon and cleanup-free. + // TODO(wfarner): Should we use a bounded caching thread pool executor instead? + this(AsyncUtil.loggingScheduledExecutor(ASYNC_WORKER_THREADS.get(), "AsyncProcessor-%d", LOG)); + } @VisibleForTesting - static final String ASYNC_TASKS_GAUGE = "async_tasks_completed"; + public AsyncModule(ScheduledThreadPoolExecutor executor) { + this.afterTransaction = requireNonNull(executor); + } @Override protected void configure() { - // Don't worry about clean shutdown, these can be daemon and cleanup-free. - final ScheduledThreadPoolExecutor executor = - AsyncUtil.loggingScheduledExecutor(ASYNC_WORKER_THREADS.get(), "AsyncProcessor-%d", LOG); - bind(ScheduledThreadPoolExecutor.class).annotatedWith(AsyncExecutor.class).toInstance(executor); - bind(ScheduledExecutorService.class).annotatedWith(AsyncExecutor.class).toInstance(executor); - bind(ExecutorService.class).annotatedWith(AsyncExecutor.class).toInstance(executor); + install(new PrivateModule() { + @Override + protected void configure() { + bind(ScheduledThreadPoolExecutor.class).toInstance(afterTransaction); + bind(ScheduledExecutorService.class).toInstance(afterTransaction); + + bind(GatedDelayExecutor.class).in(Singleton.class); + expose(GatedDelayExecutor.class); + + bind(RegisterGauges.class).in(Singleton.class); + expose(RegisterGauges.class); + } + }); SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterGauges.class); + + bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class); + bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class); } static class RegisterGauges extends AbstractIdleService { + @VisibleForTesting + static final String TIMEOUT_QUEUE_GAUGE = "timeout_queue_size"; + + @VisibleForTesting + static final String ASYNC_TASKS_GAUGE = "async_tasks_completed"; + + @VisibleForTesting + static final String DELAY_QUEUE_GAUGE = "delay_executor_queue_size"; + private final StatsProvider statsProvider; private final ScheduledThreadPoolExecutor executor; + private final GatedDelayExecutor delayExecutor; @Inject RegisterGauges( StatsProvider statsProvider, - @AsyncExecutor ScheduledThreadPoolExecutor executor) { + ScheduledThreadPoolExecutor executor, + GatedDelayExecutor delayExecutor) { this.statsProvider = requireNonNull(statsProvider); this.executor = requireNonNull(executor); + this.delayExecutor = requireNonNull(delayExecutor); } @Override protected void startUp() { - statsProvider.makeGauge( - TIMEOUT_QUEUE_GAUGE, - new Supplier() { - @Override - public Integer get() { - return executor.getQueue().size(); - } - }); - statsProvider.makeGauge( - ASYNC_TASKS_GAUGE, - new Supplier() { - @Override - public Long get() { - return executor.getCompletedTaskCount(); - } - } - ); + statsProvider.makeGauge(TIMEOUT_QUEUE_GAUGE, () -> executor.getQueue().size()); + statsProvider.makeGauge(ASYNC_TASKS_GAUGE, executor::getCompletedTaskCount); + // Using a lambda rather than method ref to sidestep a bug in PMD that makes it think + // delayExecutor is unused. + statsProvider.makeGauge(DELAY_QUEUE_GAUGE, () -> delayExecutor.getQueueSize()); } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java b/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java index 1893a9b..2889e79 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java +++ b/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java @@ -16,6 +16,8 @@ package org.apache.aurora.scheduler.async; import java.util.Queue; import java.util.concurrent.ScheduledExecutorService; +import javax.inject.Inject; + import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.twitter.common.quantity.Amount; @@ -36,10 +38,15 @@ class GatedDelayExecutor implements DelayExecutor, FlushableWorkQueue { * * @param delegate Delegate to execute work with when flushed. */ + @Inject GatedDelayExecutor(ScheduledExecutorService delegate) { this.executor = requireNonNull(delegate); } + synchronized int getQueueSize() { + return queue.size(); + } + private synchronized void enqueue(Runnable work) { queue.add(work); } http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java index 4b8a55f..d1ebad1 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java @@ -17,8 +17,6 @@ import java.util.Comparator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; @@ -43,6 +41,7 @@ import com.twitter.common.stats.Stats; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; @@ -159,14 +158,14 @@ public interface OfferManager extends EventSubscriber { private final Driver driver; private final OfferReturnDelay returnDelay; - private final ScheduledExecutorService executor; + private final DelayExecutor executor; @Inject @VisibleForTesting public OfferManagerImpl( Driver driver, OfferReturnDelay returnDelay, - @AsyncExecutor ScheduledExecutorService executor) { + @AsyncExecutor DelayExecutor executor) { this.driver = requireNonNull(driver); this.returnDelay = requireNonNull(returnDelay); @@ -190,15 +189,14 @@ public interface OfferManager extends EventSubscriber { removeAndDecline(sameSlave.get().getOffer().getId()); } else { hostOffers.add(offer); - executor.schedule( + executor.execute( new Runnable() { @Override public void run() { removeAndDecline(offer.getOffer().getId()); } }, - returnDelay.get().as(Time.MILLISECONDS), - TimeUnit.MILLISECONDS); + returnDelay.get()); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java index fa9a09c..3cff5bb 100644 --- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java +++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java @@ -14,8 +14,6 @@ package org.apache.aurora.scheduler.pruning; import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import javax.inject.Inject; @@ -32,6 +30,7 @@ import com.twitter.common.util.Clock; import org.apache.aurora.gen.apiConstants; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.state.StateManager; @@ -51,7 +50,7 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; public class TaskHistoryPruner implements EventSubscriber { private static final Logger LOG = Logger.getLogger(TaskHistoryPruner.class.getName()); - private final ScheduledExecutorService executor; + private final DelayExecutor executor; private final StateManager stateManager; private final Clock clock; private final HistoryPrunnerSettings settings; @@ -83,7 +82,7 @@ public class TaskHistoryPruner implements EventSubscriber { @Inject TaskHistoryPruner( - @AsyncExecutor ScheduledExecutorService executor, + @AsyncExecutor DelayExecutor executor, StateManager stateManager, Clock clock, HistoryPrunnerSettings settings, @@ -142,7 +141,7 @@ public class TaskHistoryPruner implements EventSubscriber { long timeRemaining) { LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms."); - executor.schedule( + executor.execute( new Runnable() { @Override public void run() { @@ -150,10 +149,9 @@ public class TaskHistoryPruner implements EventSubscriber { deleteTasks(ImmutableSet.of(taskId)); } }, - timeRemaining, - TimeUnit.MILLISECONDS); + Amount.of(timeRemaining, Time.MILLISECONDS)); - executor.submit(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { Iterable inactiveTasks = http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java index 1611a3b..b422fa1 100644 --- a/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java @@ -13,8 +13,6 @@ */ package org.apache.aurora.scheduler.reconciliation; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; @@ -23,11 +21,14 @@ import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.eventbus.Subscribe; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; import com.twitter.common.stats.StatsProvider; import com.twitter.common.util.BackoffStrategy; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; @@ -48,7 +49,7 @@ public class KillRetry implements EventSubscriber { private final Driver driver; private final Storage storage; - private final ScheduledExecutorService executor; + private final DelayExecutor executor; private final BackoffStrategy backoffStrategy; private final AtomicLong killRetries; @@ -56,7 +57,7 @@ public class KillRetry implements EventSubscriber { KillRetry( Driver driver, Storage storage, - @AsyncExecutor ScheduledExecutorService executor, + @AsyncExecutor DelayExecutor executor, BackoffStrategy backoffStrategy, StatsProvider statsProvider) { @@ -84,7 +85,7 @@ public class KillRetry implements EventSubscriber { void tryLater() { retryInMs.set(backoffStrategy.calculateBackoffMs(retryInMs.get())); - executor.schedule(this, retryInMs.get(), TimeUnit.MILLISECONDS); + executor.execute(this, Amount.of(retryInMs.get(), Time.MILLISECONDS)); } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java index 406c077..2677238 100644 --- a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java @@ -13,6 +13,12 @@ */ package org.apache.aurora.scheduler.reconciliation; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.concurrent.ScheduledExecutorService; +import java.util.logging.Logger; + +import javax.inject.Qualifier; import javax.inject.Singleton; import com.google.inject.AbstractModule; @@ -27,14 +33,22 @@ import com.twitter.common.util.BackoffStrategy; import com.twitter.common.util.TruncatedBinaryBackoff; import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.reconciliation.TaskReconciler.TaskReconcilerSettings; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + /** * Binding module for state reconciliation and retry logic. */ public class ReconciliationModule extends AbstractModule { + private static final Logger LOG = Logger.getLogger(ReconciliationModule.class.getName()); + @CmdLine(name = "transient_task_state_timeout", help = "The amount of time after which to treat a task stuck in a transient state as LOST.") private static final Arg> TRANSIENT_TASK_STATE_TIMEOUT = @@ -73,6 +87,10 @@ public class ReconciliationModule extends AbstractModule { private static final Arg> RECONCILIATION_SCHEDULE_SPREAD = Arg.create(Amount.of(30L, Time.MINUTES)); + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface BackgroundWorker { } + @Override protected void configure() { install(new PrivateModule() { @@ -109,6 +127,8 @@ public class ReconciliationModule extends AbstractModule { RECONCILIATION_EXPLICIT_INTERVAL.get(), RECONCILIATION_IMPLICIT_INTERVAL.get(), RECONCILIATION_SCHEDULE_SPREAD.get())); + bind(ScheduledExecutorService.class).annotatedWith(BackgroundWorker.class) + .toInstance(AsyncUtil.loggingScheduledExecutor(1, "TaskReconciler-%d", LOG)); bind(TaskReconciler.class).in(Singleton.class); expose(TaskReconciler.class); } http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java index 653e52b..8f866a8 100644 --- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java @@ -27,10 +27,10 @@ import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import com.twitter.common.stats.StatsProvider; -import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.reconciliation.ReconciliationModule.BackgroundWorker; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.mesos.Protos; @@ -91,7 +91,7 @@ public class TaskReconciler extends AbstractIdleService { TaskReconcilerSettings settings, Storage storage, Driver driver, - @AsyncExecutor ScheduledExecutorService executor, + @BackgroundWorker ScheduledExecutorService executor, StatsProvider stats) { this.settings = requireNonNull(settings); http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java index fb83972..72a46f0 100644 --- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java @@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.reconciliation; import java.util.EnumSet; import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; @@ -31,6 +30,7 @@ import com.twitter.common.stats.StatsProvider; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.state.StateChangeResult; @@ -65,7 +65,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber { ScheduleStatus.KILLING, ScheduleStatus.DRAINING); - private final ScheduledExecutorService executor; + private final DelayExecutor executor; private final Storage storage; private final StateManager stateManager; private final Amount timeout; @@ -73,7 +73,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber { @Inject TaskTimeout( - @AsyncExecutor ScheduledExecutorService executor, + @AsyncExecutor DelayExecutor executor, Storage storage, StateManager stateManager, Amount timeout, @@ -138,10 +138,9 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber { // Our service is not yet started. We don't want to lose track of the task, so // we will try again later. LOG.fine("Retrying timeout of task " + taskId + " in " + NOT_STARTED_RETRY); - executor.schedule( - this, - NOT_STARTED_RETRY.getValue(), - NOT_STARTED_RETRY.getUnit().getTimeUnit()); + // TODO(wfarner): This execution should not wait for a transaction, but a second executor + // would be weird. + executor.execute(this, NOT_STARTED_RETRY); } } } @@ -149,10 +148,9 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber { @Subscribe public void recordStateChange(TaskStateChange change) { if (isTransient(change.getNewState())) { - executor.schedule( + executor.execute( new TimedOutTaskHandler(change.getTaskId(), change.getNewState()), - timeout.getValue(), - timeout.getUnit().getTimeUnit()); + timeout); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java index e60daad..eaf784e 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java @@ -14,13 +14,9 @@ package org.apache.aurora.scheduler.scheduling; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.logging.Logger; import javax.inject.Inject; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; @@ -28,16 +24,13 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.eventbus.Subscribe; import com.google.common.util.concurrent.RateLimiter; -import com.twitter.common.application.ShutdownRegistry; -import com.twitter.common.base.Command; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import com.twitter.common.stats.SlidingStats; -import com.twitter.common.stats.Stats; import com.twitter.common.util.BackoffStrategy; -import com.twitter.common.util.concurrent.ExecutorServiceShutdown; -import org.apache.aurora.scheduler.base.AsyncUtil; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; @@ -47,7 +40,6 @@ import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.aurora.gen.ScheduleStatus.PENDING; @@ -63,10 +55,8 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING; */ public class TaskGroups implements EventSubscriber { - private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName()); - private final ConcurrentMap groups = Maps.newConcurrentMap(); - private final ScheduledExecutorService executor; + private final DelayExecutor executor; private final TaskScheduler taskScheduler; private final long firstScheduleDelay; private final BackoffStrategy backoff; @@ -95,43 +85,25 @@ public class TaskGroups implements EventSubscriber { @Inject TaskGroups( - ShutdownRegistry shutdownRegistry, + @AsyncExecutor DelayExecutor executor, TaskGroupsSettings settings, TaskScheduler taskScheduler, RescheduleCalculator rescheduleCalculator) { - this( - createThreadPool(shutdownRegistry), - settings.firstScheduleDelay, - settings.taskGroupBackoff, - settings.rateLimiter, - taskScheduler, - rescheduleCalculator); - } - - @VisibleForTesting - TaskGroups( - final ScheduledExecutorService executor, - final Amount firstScheduleDelay, - final BackoffStrategy backoff, - final RateLimiter rateLimiter, - final TaskScheduler taskScheduler, - final RescheduleCalculator rescheduleCalculator) { - - requireNonNull(firstScheduleDelay); - Preconditions.checkArgument(firstScheduleDelay.getValue() > 0); + requireNonNull(settings.firstScheduleDelay); + Preconditions.checkArgument(settings.firstScheduleDelay.getValue() > 0); this.executor = requireNonNull(executor); - requireNonNull(rateLimiter); + requireNonNull(settings.rateLimiter); requireNonNull(taskScheduler); - this.firstScheduleDelay = firstScheduleDelay.as(Time.MILLISECONDS); - this.backoff = requireNonNull(backoff); + this.firstScheduleDelay = settings.firstScheduleDelay.as(Time.MILLISECONDS); + this.backoff = requireNonNull(settings.taskGroupBackoff); this.rescheduleCalculator = requireNonNull(rescheduleCalculator); this.taskScheduler = new TaskScheduler() { @Override public boolean schedule(String taskId) { - rateLimiter.acquire(); + settings.rateLimiter.acquire(); return taskScheduler.schedule(taskId); } }; @@ -141,7 +113,7 @@ public class TaskGroups implements EventSubscriber { // Avoid check-then-act by holding the intrinsic lock. If not done atomically, we could // remove a group while a task is being added to it. if (group.hasMore()) { - executor.schedule(evaluate, group.getPenaltyMs(), MILLISECONDS); + executor.execute(evaluate, Amount.of(group.getPenaltyMs(), Time.MILLISECONDS)); } else { groups.remove(group.getKey()); } @@ -172,20 +144,6 @@ public class TaskGroups implements EventSubscriber { evaluateGroupLater(monitor, group); } - private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) { - final ScheduledThreadPoolExecutor executor = - AsyncUtil.singleThreadLoggingScheduledExecutor("TaskScheduler-%d", LOG); - - Stats.exportSize("schedule_queue_size", executor.getQueue()); - shutdownRegistry.addAction(new Command() { - @Override - public void execute() { - new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute(); - } - }); - return executor; - } - /** * Informs the task groups of a task state change. *

http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java index e54e6c4..fdc5bd7 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java @@ -13,17 +13,17 @@ */ package org.apache.aurora.scheduler.scheduling; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - import javax.inject.Inject; import com.google.common.base.Optional; import com.google.common.eventbus.Subscribe; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; import com.twitter.common.stats.SlidingStats; import com.twitter.common.util.Clock; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; @@ -45,7 +45,7 @@ class TaskThrottler implements EventSubscriber { private final RescheduleCalculator rescheduleCalculator; private final Clock clock; - private final ScheduledExecutorService executor; + private final DelayExecutor executor; private final Storage storage; private final StateManager stateManager; @@ -55,7 +55,7 @@ class TaskThrottler implements EventSubscriber { TaskThrottler( RescheduleCalculator rescheduleCalculator, Clock clock, - @AsyncExecutor ScheduledExecutorService executor, + @AsyncExecutor DelayExecutor executor, Storage storage, StateManager stateManager) { @@ -73,7 +73,7 @@ class TaskThrottler implements EventSubscriber { + rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask()); long delayMs = Math.max(0, readyAtMs - clock.nowMillis()); throttleStats.accumulate(delayMs); - executor.schedule( + executor.execute( new Runnable() { @Override public void run() { @@ -90,8 +90,7 @@ class TaskThrottler implements EventSubscriber { }); } }, - delayMs, - TimeUnit.MILLISECONDS); + Amount.of(delayMs, Time.MILLISECONDS)); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java index ed92661..f0620b9 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java @@ -30,6 +30,7 @@ import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.PrivateModule; import com.google.inject.TypeLiteral; +import com.google.inject.util.Modules; import com.twitter.common.args.Arg; import com.twitter.common.args.CmdLine; import com.twitter.common.inject.Bindings.KeyFactory; @@ -37,6 +38,8 @@ import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.async.FlushableWorkQueue; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.CronJobStore; import org.apache.aurora.scheduler.storage.JobUpdateStore; @@ -149,15 +152,23 @@ public final class DbModule extends PrivateModule { */ @VisibleForTesting public static Module testModule(KeyFactory keyFactory, Optional taskStoreModule) { - return new DbModule( - keyFactory, - taskStoreModule.isPresent() ? taskStoreModule.get() : getTaskStoreModule(keyFactory), - "testdb-" + UUID.randomUUID().toString(), - // A non-zero close delay is used here to avoid eager database cleanup in tests that - // make use of multiple threads. Since all test databases are separately scoped by the - // included UUID, multiple DB instances will overlap in time but they should be distinct - // in content. - ImmutableMap.of("DB_CLOSE_DELAY", "5")); + return Modules.combine( + new AbstractModule() { + @Override + protected void configure() { + bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).toInstance(() -> { }); + } + }, + new DbModule( + keyFactory, + taskStoreModule.isPresent() ? taskStoreModule.get() : getTaskStoreModule(keyFactory), + "testdb-" + UUID.randomUUID().toString(), + // A non-zero close delay is used here to avoid eager database cleanup in tests that + // make use of multiple threads. Since all test databases are separately scoped by the + // included UUID, multiple DB instances will overlap in time but they should be distinct + // in content. + ImmutableMap.of("DB_CLOSE_DELAY", "5")) + ); } /** http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java index a1f0d3c..aac62e2 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java @@ -28,6 +28,8 @@ import org.apache.aurora.gen.JobUpdateAction; import org.apache.aurora.gen.JobUpdateStatus; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.async.FlushableWorkQueue; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.CronJobStore; import org.apache.aurora.scheduler.storage.JobUpdateStore; @@ -59,11 +61,13 @@ class DbStorage extends AbstractIdleService implements Storage { private final SqlSessionFactory sessionFactory; private final MutableStoreProvider storeProvider; private final EnumValueMapper enumValueMapper; + private final FlushableWorkQueue postTransactionWork; @Inject DbStorage( SqlSessionFactory sessionFactory, EnumValueMapper enumValueMapper, + @AsyncExecutor FlushableWorkQueue postTransactionWork, final CronJobStore.Mutable cronJobStore, final TaskStore.Mutable taskStore, final SchedulerStore.Mutable schedulerStore, @@ -74,6 +78,7 @@ class DbStorage extends AbstractIdleService implements Storage { this.sessionFactory = requireNonNull(sessionFactory); this.enumValueMapper = requireNonNull(enumValueMapper); + this.postTransactionWork = requireNonNull(postTransactionWork); requireNonNull(cronJobStore); requireNonNull(taskStore); requireNonNull(schedulerStore); @@ -139,11 +144,23 @@ class DbStorage extends AbstractIdleService implements Storage { @Override @Transactional public T write(MutateWork work) throws StorageException, E { - try { - return work.apply(storeProvider); + T result; + try (SqlSession session = sessionFactory.openSession(false)) { + result = work.apply(storeProvider); + session.commit(); } catch (PersistenceException e) { throw new StorageException(e.getMessage(), e); + } finally { + // NOTE: Async work is intentionally executed regardless of whether the transaction succeeded. + // Doing otherwise runs the risk of cross-talk between transactions and losing async tasks + // due to failure of an unrelated transaction. This matches behavior prior to the + // introduction of DbStorage, but should be revisited. + // TODO(wfarner): Consider revisiting to execute async work only when the transaction is + // successful. + postTransactionWork.flush(); } + + return result; } @VisibleForTesting @@ -169,6 +186,8 @@ class DbStorage extends AbstractIdleService implements Storage { } finally { session.update(ENABLE_UNDO_LOG); } + } finally { + postTransactionWork.flush(); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java index 3a86614..06e7f23 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java @@ -63,6 +63,8 @@ public final class DbUtil { /** * Creates a new, empty test storage system. + *

+ * TODO(wfarner): Rename this to createTestStorage() to avoid misuse. * * @return A new storage instance. */ http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java index 5384307..9d66685 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java @@ -29,6 +29,7 @@ import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.Clock; import org.apache.aurora.scheduler.AppStartup; +import org.apache.aurora.scheduler.async.AsyncModule.RegisterGauges; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.junit.Before; @@ -86,7 +87,10 @@ public class AsyncModuleTest extends EasyMockTest { injector.getBindings(); assertEquals( - ImmutableMap.of(AsyncModule.TIMEOUT_QUEUE_GAUGE, 0, AsyncModule.ASYNC_TASKS_GAUGE, 0L), + ImmutableMap.of( + RegisterGauges.TIMEOUT_QUEUE_GAUGE, 0, + RegisterGauges.ASYNC_TASKS_GAUGE, 0L, + RegisterGauges.DELAY_QUEUE_GAUGE, 0), statsProvider.getAllValues() ); } http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java index 91b91bc..6a17d3a 100644 --- a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java @@ -41,12 +41,14 @@ import com.twitter.common.net.pool.DynamicHostSet; import com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.BackoffStrategy; import com.twitter.thrift.ServiceInstance; import org.apache.aurora.gen.ServerInfo; import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.async.AsyncModule; import org.apache.aurora.scheduler.cron.CronJobManager; import org.apache.aurora.scheduler.http.api.GsonMessageBodyHandler; import org.apache.aurora.scheduler.offers.OfferManager; @@ -57,6 +59,7 @@ import org.apache.aurora.scheduler.state.LockManager; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IServerInfo; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.easymock.Capture; import org.junit.Before; @@ -96,6 +99,7 @@ public abstract class JettyServerModuleTest extends EasyMockTest { new StatsModule(), new LifecycleModule(), new SchedulerServicesModule(), + new AsyncModule(), new AbstractModule() { T bindMock(Class clazz) { T mock = createMock(clazz); @@ -105,6 +109,7 @@ public abstract class JettyServerModuleTest extends EasyMockTest { @Override protected void configure() { + bind(StatsProvider.class).toInstance(new FakeStatsProvider()); bind(Storage.class).toInstance(storage.storage); bind(IServerInfo.class).toInstance(IServerInfo.build(new ServerInfo() .setClusterName("unittest") http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java b/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java index e725e10..a5703e5 100644 --- a/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java +++ b/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java @@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.http.api.security; import java.io.IOException; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableSet; @@ -25,7 +24,6 @@ import com.google.inject.AbstractModule; import com.google.inject.Module; import com.google.inject.util.Modules; import com.sun.jersey.api.client.ClientResponse; -import com.twitter.common.stats.StatsProvider; import org.apache.aurora.gen.AuroraAdmin; import org.apache.aurora.gen.Lock; @@ -62,7 +60,6 @@ import org.junit.Test; import static org.apache.aurora.scheduler.http.H2ConsoleModule.H2_PATH; import static org.apache.aurora.scheduler.http.H2ConsoleModule.H2_PERM; import static org.apache.aurora.scheduler.http.api.ApiModule.API_PATH; -import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -98,7 +95,6 @@ public class HttpSecurityIT extends JettyServerModuleTest { private Ini ini; private AnnotatedAuroraAdmin auroraAdmin; - private StatsProvider statsProvider; private static final Joiner COMMA_JOINER = Joiner.on(", "); private static final String ADMIN_ROLE = "admin"; @@ -138,8 +134,6 @@ public class HttpSecurityIT extends JettyServerModuleTest { roles.put(H2_ROLE, H2_PERM); auroraAdmin = createMock(AnnotatedAuroraAdmin.class); - statsProvider = createMock(StatsProvider.class); - expect(statsProvider.makeCounter(anyString())).andStubReturn(new AtomicLong()); } @Override @@ -152,7 +146,6 @@ public class HttpSecurityIT extends JettyServerModuleTest { @Override protected void configure() { MockDecoratedThrift.bindForwardedMock(binder(), auroraAdmin); - bind(StatsProvider.class).toInstance(statsProvider); } }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java index 088a4a6..1cc9ec4 100644 --- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java @@ -13,7 +13,6 @@ */ package org.apache.aurora.scheduler.offers; -import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import com.google.common.base.Optional; @@ -29,6 +28,7 @@ import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged; @@ -85,8 +85,8 @@ public class OfferManagerImplTest extends EasyMockTest { } }); driver = createMock(Driver.class); - ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class); - clock = FakeScheduledExecutor.scheduleExecutor(executorMock); + DelayExecutor executorMock = createMock(DelayExecutor.class); + clock = FakeScheduledExecutor.fromDelayExecutor(executorMock); addTearDown(new TearDown() { @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java index 532b0ea..892861d 100644 --- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java @@ -14,20 +14,22 @@ package org.apache.aurora.scheduler.pruning; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ScheduledThreadPoolExecutor; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; import com.twitter.common.base.Command; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.testing.FakeClock; @@ -39,6 +41,10 @@ import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.async.AsyncModule; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.async.DelayExecutor; +import org.apache.aurora.scheduler.async.FlushableWorkQueue; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings; @@ -46,6 +52,7 @@ import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; @@ -72,8 +79,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest { private static final Amount ONE_HOUR = Amount.of(1L, Time.HOURS); private static final int PER_JOB_HISTORY = 2; - private ScheduledFuture future; - private ScheduledExecutorService executor; + private DelayExecutor executor; private FakeClock clock; private StateManager stateManager; private StorageTestUtil storageUtil; @@ -81,8 +87,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest { @Before public void setUp() { - future = createMock(new Clazz>() { }); - executor = createMock(ScheduledExecutorService.class); + executor = createMock(DelayExecutor.class); clock = new FakeClock(); stateManager = createMock(StateManager.class); storageUtil = new StorageTestUtil(this); @@ -232,14 +237,29 @@ public class TaskHistoryPrunerTest extends EasyMockTest { // lock. This causes a deadlock when the executor tries to acquire a lock held by the event // fired. - pruner = prunerWithRealExecutor(); + ScheduledThreadPoolExecutor realExecutor = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("testThreadSafeEvents-executor") + .build()); + Injector injector = Guice.createInjector( + new AsyncModule(realExecutor), + new AbstractModule() { + @Override + protected void configure() { + bind(StatsProvider.class).toInstance(new FakeStatsProvider()); + } + }); + executor = injector.getInstance(Key.get(DelayExecutor.class, AsyncExecutor.class)); + FlushableWorkQueue flusher = + injector.getInstance(Key.get(FlushableWorkQueue.class, AsyncExecutor.class)); + + pruner = buildPruner(executor); Command onDeleted = new Command() { @Override public void execute() { // The goal is to verify that the call does not deadlock. We do not care about the outcome. - IScheduledTask b = makeTask("b", ASSIGNED); - - changeState(b, STARTING); + changeState(makeTask("b", ASSIGNED), STARTING); } }; CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID); @@ -248,17 +268,13 @@ public class TaskHistoryPrunerTest extends EasyMockTest { // Change the task to a terminal state and wait for it to be pruned. changeState(makeTask(TASK_ID, RUNNING), KILLED); + flusher.flush(); taskDeleted.await(); } - private TaskHistoryPruner prunerWithRealExecutor() { - ScheduledExecutorService realExecutor = Executors.newScheduledThreadPool(1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("testThreadSafeEvents-executor") - .build()); + private TaskHistoryPruner buildPruner(DelayExecutor delayExecutor) { return new TaskHistoryPruner( - realExecutor, + delayExecutor, stateManager, clock, new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY), @@ -326,7 +342,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest { IScheduledTask... pruned) { // Expect a deferred prune operation when a new task is being watched. - executor.submit(EasyMock.anyObject()); + executor.execute(EasyMock.anyObject()); expectLastCall().andAnswer( new IAnswer>() { @Override @@ -348,11 +364,9 @@ public class TaskHistoryPrunerTest extends EasyMockTest { private Capture expectDelayedPrune(long timestampMillis, int count) { Capture capture = createCapture(); - executor.schedule( + executor.execute( EasyMock.capture(capture), - eq(pruner.calculateTimeout(timestampMillis)), - eq(TimeUnit.MILLISECONDS)); - expectLastCall().andReturn(future).times(count); + eq(Amount.of(pruner.calculateTimeout(timestampMillis), Time.MILLISECONDS))); return capture; } http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java index 26f65fa..957cbd0 100644 --- a/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java +++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java @@ -14,12 +14,10 @@ package org.apache.aurora.scheduler.reconciliation; import java.lang.Thread.UncaughtExceptionHandler; -import java.util.concurrent.ScheduledExecutorService; import javax.inject.Singleton; import com.google.common.eventbus.EventBus; -import com.google.common.testing.TearDown; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; @@ -34,6 +32,7 @@ import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.events.PubsubEventModule; @@ -67,14 +66,9 @@ public class KillRetryTest extends EasyMockTest { storageUtil = new StorageTestUtil(this); storageUtil.expectOperations(); backoffStrategy = createMock(BackoffStrategy.class); - final ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class); - clock = FakeScheduledExecutor.scheduleExecutor(executorMock); - addTearDown(new TearDown() { - @Override - public void tearDown() { - clock.assertEmpty(); - } - }); + final DelayExecutor executorMock = createMock(DelayExecutor.class); + clock = FakeScheduledExecutor.fromDelayExecutor(executorMock); + addTearDown(clock::assertEmpty); statsProvider = new FakeStatsProvider(); Injector injector = Guice.createInjector( @@ -85,8 +79,7 @@ public class KillRetryTest extends EasyMockTest { protected void configure() { bind(Driver.class).toInstance(driver); bind(Storage.class).toInstance(storageUtil.storage); - bind(ScheduledExecutorService.class).annotatedWith(AsyncExecutor.class) - .toInstance(executorMock); + bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).toInstance(executorMock); PubsubEventModule.bindSubscriber(binder(), KillRetry.class); bind(KillRetry.class).in(Singleton.class); bind(BackoffStrategy.class).toInstance(backoffStrategy); http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java index 97d25f9..2bcda70 100644 --- a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java +++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java @@ -13,8 +13,6 @@ */ package org.apache.aurora.scheduler.reconciliation; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Optional; @@ -30,6 +28,7 @@ import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.state.StateChangeResult; import org.apache.aurora.scheduler.state.StateManager; @@ -52,7 +51,6 @@ import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.gen.ScheduleStatus.STARTING; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; public class TaskTimeoutTest extends EasyMockTest { @@ -61,9 +59,8 @@ public class TaskTimeoutTest extends EasyMockTest { private static final Amount TIMEOUT = Amount.of(1L, Time.MINUTES); private AtomicLong timedOutTaskCounter; - private ScheduledExecutorService executor; + private DelayExecutor executor; private StorageTestUtil storageUtil; - private ScheduledFuture future; private StateManager stateManager; private FakeClock clock; private TaskTimeout timeout; @@ -71,10 +68,9 @@ public class TaskTimeoutTest extends EasyMockTest { @Before public void setUp() { - executor = createMock(ScheduledExecutorService.class); + executor = createMock(DelayExecutor.class); storageUtil = new StorageTestUtil(this); storageUtil.expectOperations(); - future = createMock(new Clazz>() { }); stateManager = createMock(StateManager.class); clock = new FakeClock(); statsProvider = createMock(StatsProvider.class); @@ -96,11 +92,7 @@ public class TaskTimeoutTest extends EasyMockTest { private Capture expectTaskWatch(Amount expireIn) { Capture capture = createCapture(); - executor.schedule( - EasyMock.capture(capture), - eq((long) expireIn.getValue()), - eq(expireIn.getUnit().getTimeUnit())); - expectLastCall().andReturn(future); + executor.execute(EasyMock.capture(capture), eq(expireIn)); return capture; } http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java index 55aad35..6bcfefd 100644 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java @@ -13,8 +13,6 @@ */ package org.apache.aurora.scheduler.scheduling; -import java.util.concurrent.ScheduledExecutorService; - import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.RateLimiter; import com.twitter.common.quantity.Amount; @@ -27,9 +25,11 @@ import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; +import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; @@ -56,17 +56,15 @@ public class TaskGroupsTest extends EasyMockTest { @Before public void setUp() throws Exception { - ScheduledExecutorService executor = createMock(ScheduledExecutorService.class); - clock = FakeScheduledExecutor.scheduleExecutor(executor); + DelayExecutor executor = createMock(DelayExecutor.class); + clock = FakeScheduledExecutor.fromDelayExecutor(executor); backoffStrategy = createMock(BackoffStrategy.class); taskScheduler = createMock(TaskScheduler.class); rateLimiter = createMock(RateLimiter.class); rescheduleCalculator = createMock(RescheduleCalculator.class); taskGroups = new TaskGroups( executor, - FIRST_SCHEDULE_DELAY, - backoffStrategy, - rateLimiter, + new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter), taskScheduler, rescheduleCalculator); } http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java index 4055021..ba08fe5 100644 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java @@ -13,9 +13,6 @@ */ package org.apache.aurora.scheduler.scheduling; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.twitter.common.quantity.Amount; @@ -27,6 +24,7 @@ import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.state.StateChangeResult; @@ -49,7 +47,7 @@ public class TaskThrottlerTest extends EasyMockTest { private RescheduleCalculator rescheduleCalculator; private FakeClock clock; - private ScheduledExecutorService executor; + private DelayExecutor executor; private StorageTestUtil storageUtil; private StateManager stateManager; private TaskThrottler throttler; @@ -58,7 +56,7 @@ public class TaskThrottlerTest extends EasyMockTest { public void setUp() throws Exception { rescheduleCalculator = createMock(RescheduleCalculator.class); clock = new FakeClock(); - executor = createMock(ScheduledExecutorService.class); + executor = createMock(DelayExecutor.class); storageUtil = new StorageTestUtil(this); storageUtil.expectOperations(); stateManager = createMock(StateManager.class); @@ -116,11 +114,9 @@ public class TaskThrottlerTest extends EasyMockTest { private Capture expectThrottled(long penaltyMs) { Capture stateChangeCapture = createCapture(); - expect(executor.schedule( + executor.execute( capture(stateChangeCapture), - eq(penaltyMs), - eq(TimeUnit.MILLISECONDS))) - .andReturn(null); + eq(Amount.of(penaltyMs, Time.MILLISECONDS))); return stateChangeCapture; } http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java index a4bcdd7..3b05db9 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java @@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.storage.db; import com.twitter.common.testing.easymock.EasyMockTest; +import org.apache.aurora.scheduler.async.FlushableWorkQueue; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.CronJobStore; import org.apache.aurora.scheduler.storage.JobUpdateStore; @@ -40,6 +41,7 @@ public class DbStorageTest extends EasyMockTest { private SqlSessionFactory sessionFactory; private SqlSession session; private EnumValueMapper enumMapper; + private FlushableWorkQueue flusher; private Work.Quiet readWork; private MutateWork.NoResult.Quiet writeWork; @@ -50,12 +52,14 @@ public class DbStorageTest extends EasyMockTest { sessionFactory = createMock(SqlSessionFactory.class); session = createMock(SqlSession.class); enumMapper = createMock(EnumValueMapper.class); + flusher = createMock(FlushableWorkQueue.class); readWork = createMock(new Clazz>() { }); writeWork = createMock(new Clazz() { }); storage = new DbStorage( sessionFactory, enumMapper, + flusher, createMock(CronJobStore.Mutable.class), createMock(TaskStore.Mutable.class), createMock(SchedulerStore.Mutable.class), @@ -89,6 +93,7 @@ public class DbStorageTest extends EasyMockTest { expect(sessionFactory.openSession(false)).andReturn(session); expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andThrow(new PersistenceException()); expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0); + flusher.flush(); control.replay(); @@ -102,6 +107,7 @@ public class DbStorageTest extends EasyMockTest { expect(writeWork.apply(EasyMock.anyObject())).andReturn(null); session.close(); expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0); + flusher.flush(); control.replay(); http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java index e198c1c..48978ec 100644 --- a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java +++ b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java @@ -27,6 +27,7 @@ import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import com.twitter.common.util.testing.FakeClock; +import org.apache.aurora.scheduler.async.DelayExecutor; import org.easymock.EasyMock; import org.easymock.IAnswer; @@ -43,6 +44,7 @@ public final class FakeScheduledExecutor extends FakeClock { private FakeScheduledExecutor() { } + // TODO(wfarner): Rename to fromScheduledExecutor(). public static FakeScheduledExecutor scheduleExecutor(ScheduledExecutorService mock) { FakeScheduledExecutor executor = new FakeScheduledExecutor(); mock.schedule( @@ -57,6 +59,33 @@ public final class FakeScheduledExecutor extends FakeClock { return executor; } + private static IAnswer answerExecuteWithDelay(final FakeScheduledExecutor executor) { + return new IAnswer() { + @Override + public Object answer() { + Object[] args = EasyMock.getCurrentArguments(); + Runnable work = (Runnable) args[0]; + @SuppressWarnings("unchecked") + Amount delay = (Amount) args[1]; + addDelayedWork(executor, delay.as(Time.MILLISECONDS), work); + return null; + } + }; + } + + public static FakeScheduledExecutor fromDelayExecutor(DelayExecutor mock) { + FakeScheduledExecutor executor = new FakeScheduledExecutor(); + mock.execute( + EasyMock.anyObject(), + EasyMock.>anyObject()); + expectLastCall().andAnswer(answerExecuteWithDelay(executor)).anyTimes(); + + mock.execute(EasyMock.anyObject()); + expectLastCall().andAnswer(answerExecute()).anyTimes(); + + return executor; + } + private static IAnswer answerExecute() { return new IAnswer() { @Override @@ -69,10 +98,10 @@ public final class FakeScheduledExecutor extends FakeClock { }; } - private static IAnswer> answerSchedule(final FakeScheduledExecutor executor) { - return new IAnswer>() { + private static IAnswer answerSchedule(final FakeScheduledExecutor executor) { + return new IAnswer() { @Override - public ScheduledFuture answer() { + public Object answer() { Object[] args = EasyMock.getCurrentArguments(); Runnable work = (Runnable) args[0]; long value = (Long) args[1];