Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1CB361748A for ; Fri, 20 Mar 2015 17:15:22 +0000 (UTC) Received: (qmail 78633 invoked by uid 500); 20 Mar 2015 17:15:22 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 78601 invoked by uid 500); 20 Mar 2015 17:15:21 -0000 Mailing-List: contact commits-help@aurora.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.incubator.apache.org Delivered-To: mailing list commits@aurora.incubator.apache.org Received: (qmail 78592 invoked by uid 99); 20 Mar 2015 17:15:21 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Mar 2015 17:15:21 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 20 Mar 2015 17:15:19 +0000 Received: (qmail 77969 invoked by uid 99); 20 Mar 2015 17:14:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Mar 2015 17:14:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B9BF7E181F; Fri, 20 Mar 2015 17:14:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: maxim@apache.org To: commits@aurora.incubator.apache.org Message-Id: <38ef6eb09fd64f24bebc738a0662a7b1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-aurora git commit: Adding preemptor jmh benchmark Date: Fri, 20 Mar 2015 17:14:59 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-aurora Updated Branches: refs/heads/master 9473549b9 -> 91aec8c47 Adding preemptor jmh benchmark Bugs closed: AURORA-1158 Reviewed at https://reviews.apache.org/r/32225/ Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/91aec8c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/91aec8c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/91aec8c4 Branch: refs/heads/master Commit: 91aec8c472c1505282386212df40d5a12deb9672 Parents: 9473549 Author: Maxim Khutornenko Authored: Fri Mar 20 10:12:22 2015 -0700 Committer: Maxim Khutornenko Committed: Fri Mar 20 10:12:22 2015 -0700 ---------------------------------------------------------------------- .../aurora/benchmark/SchedulingBenchmarks.java | 113 ++++++++++++++----- .../async/preemptor/PreemptorModule.java | 21 +++- .../async/preemptor/PreemptorModuleTest.java | 9 +- 3 files changed, 111 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/91aec8c4/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java index ad49eff..5309e81 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java @@ -19,6 +19,9 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import javax.inject.Singleton; +import com.google.common.base.Optional; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.eventbus.EventBus; @@ -26,6 +29,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; +import com.google.inject.PrivateModule; import com.google.inject.TypeLiteral; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; @@ -42,21 +46,20 @@ import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.async.RescheduleCalculator; import org.apache.aurora.scheduler.async.TaskScheduler; import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration; -import org.apache.aurora.scheduler.async.preemptor.ClusterState; import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl; -import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder; -import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlotFinderImpl; import org.apache.aurora.scheduler.async.preemptor.Preemptor; -import org.apache.aurora.scheduler.async.preemptor.PreemptorImpl; -import org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics; +import org.apache.aurora.scheduler.async.preemptor.PreemptorModule; +import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.mesos.ExecutorSettings; import org.apache.aurora.scheduler.state.StateModule; import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.mem.MemStorage; @@ -76,7 +79,9 @@ public class SchedulingBenchmarks { */ @State(Scope.Thread) public abstract static class AbstractBase { - private Storage storage; + protected Storage storage; + protected Preemptor preemptor; + protected ScheduledThreadPoolExecutor executor; private TaskScheduler taskScheduler; private OfferManager offerManager; private EventBus eventBus; @@ -86,25 +91,25 @@ public class SchedulingBenchmarks { * Runs once to setup up benchmark state. */ @Setup(Level.Trial) - public void prepare() { + public void setUpBenchmark() { storage = MemStorage.newEmptyStorage(); eventBus = new EventBus(); final FakeClock clock = new FakeClock(); clock.setNowMillis(System.currentTimeMillis()); + executor = new ScheduledThreadPoolExecutor( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("TestProcessor-%d").build()); // TODO(maxim): Find a way to DRY it and reuse existing modules instead. Injector injector = Guice.createInjector( new StateModule(), - new AbstractModule() { + new PreemptorModule(true, Amount.of(0L, Time.MILLISECONDS), executor), + new PrivateModule() { @Override protected void configure() { - final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor( - 1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("TestProcessor-%d").build()); bind(ScheduledExecutorService.class).toInstance(executor); - bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class); bind(OfferManager.OfferManagerImpl.class).in(Singleton.class); bind(OfferManager.OfferReturnDelay.class).toInstance( @@ -115,6 +120,12 @@ public class SchedulingBenchmarks { } }); + expose(OfferManager.class); + } + }, + new AbstractModule() { + @Override + protected void configure() { bind(new TypeLiteral>() { }) .annotatedWith(ReservationDuration.class) .toInstance(Amount.of(30L, Time.DAYS)); @@ -129,17 +140,6 @@ public class SchedulingBenchmarks { .setThermosObserverRoot("/var/run/thermos") .build()); - bind(PreemptorMetrics.class).in(Singleton.class); - bind(PreemptionSlotFinder.class).to(PreemptionSlotFinderImpl.class); - bind(PreemptionSlotFinderImpl.class).in(Singleton.class); - bind(Preemptor.class).to(PreemptorImpl.class); - bind(PreemptorImpl.class).in(Singleton.class); - bind(new TypeLiteral>() { }) - .annotatedWith(PreemptorImpl.PreemptionDelay.class) - .toInstance(Amount.of(0L, Time.MILLISECONDS)); - bind(ClusterState.class).to(ClusterStateImpl.class); - bind(ClusterStateImpl.class).in(Singleton.class); - bind(Storage.class).toInstance(storage); bind(Driver.class).toInstance(new FakeDriver()); bind(RescheduleCalculator.class).toInstance(new FakeRescheduleCalculator()); @@ -157,6 +157,7 @@ public class SchedulingBenchmarks { taskScheduler = injector.getInstance(TaskScheduler.class); offerManager = injector.getInstance(OfferManager.class); + preemptor = injector.getInstance(Preemptor.class); eventBus.register(injector.getInstance(ClusterStateImpl.class)); settings = getSettings(); @@ -169,6 +170,13 @@ public class SchedulingBenchmarks { saveTasks(ImmutableSet.of(settings.getTask())); } + @Setup(Level.Iteration) + public void setUpIteration() { + // Clear executor queue between iterations. Otherwise, executor tasks keep piling up and + // affect benchmark performance due to memory pressure and excessive GC. + executor.getQueue().clear(); + } + private Set buildClusterTasks(int numOffers) { int numOffersToFill = (int) Math.round(numOffers * settings.getClusterUtilization()); return new Tasks.Builder() @@ -270,9 +278,9 @@ public class SchedulingBenchmarks { /** * Tests scheduling performance with a large number of tasks and slaves where the cluster - * is completely filled up and preemptor is invoked for all slaves in the cluster. + * is completely filled up. */ - public static class PreemptorFallbackForLargeClusterBenchmark extends AbstractBase { + public static class ClusterFullUtilizationBenchmark extends AbstractBase { @Override protected BenchmarkSettings getSettings() { return new BenchmarkSettings.Builder() @@ -286,4 +294,55 @@ public class SchedulingBenchmarks { .build(1))).build(); } } + + /** + * Tests preemptor searching for a preemption slot in a completely filled up cluster. + */ + public static class PreemptorSlotSearchBenchmark extends AbstractBase { + + @Override + protected BenchmarkSettings getSettings() { + return new BenchmarkSettings.Builder() + .setClusterUtilization(1.0) + .setHostAttributes(new Hosts.Builder().setNumHostsPerRack(2).build(1000)) + .setTask(Iterables.getOnlyElement(new Tasks.Builder() + .setProduction(true) + .addValueConstraint("host", "denied") + .setTaskIdFormat("test-%s") + .build(1))).build(); + } + + @Override + public boolean runBenchmark() { + return storage.write(new Storage.MutateWork.Quiet() { + @Override + public Boolean apply(final Storage.MutableStoreProvider storeProvider) { + IAssignedTask assignedTask = getSettings().getTask().getAssignedTask(); + final Query.Builder query = Query.jobScoped(assignedTask.getTask().getJob()) + .byStatus(org.apache.aurora.scheduler.base.Tasks.SLAVE_ASSIGNED_STATES); + + Supplier> taskSupplier = Suppliers.memoize( + new Supplier>() { + @Override + public ImmutableSet get() { + return storeProvider.getTaskStore().fetchTasks(query); + } + }); + + AttributeAggregate aggregate = + new AttributeAggregate(taskSupplier, storeProvider.getAttributeStore()); + + Optional result = + preemptor.attemptPreemptionFor(assignedTask.getTaskId(), aggregate); + + while (executor.getActiveCount() > 0) { + // Using a tight loop to wait for a search completion. This is executed on a benchmark + // main thread and does not affect test results. + } + + return result.isPresent(); + } + }); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/91aec8c4/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java index f817ccd..7034a07 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java @@ -32,6 +32,8 @@ import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.Preempti import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.filter.AttributeAggregate; +import static java.util.Objects.requireNonNull; + import static org.apache.aurora.scheduler.base.AsyncUtil.singleThreadLoggingScheduledExecutor; public class PreemptorModule extends AbstractModule { @@ -53,14 +55,25 @@ public class PreemptorModule extends AbstractModule { Arg.create(Amount.of(3L, Time.MINUTES)); private final boolean enablePreemptor; + private final Amount preemptionDelay; + private final ScheduledExecutorService executor; @VisibleForTesting - PreemptorModule(boolean enablePreemptor) { + public PreemptorModule( + boolean enablePreemptor, + Amount preemptionDelay, + ScheduledExecutorService executor) { + this.enablePreemptor = enablePreemptor; + this.preemptionDelay = requireNonNull(preemptionDelay); + this.executor = requireNonNull(executor); } public PreemptorModule() { - this(ENABLE_PREEMPTOR.get()); + this( + ENABLE_PREEMPTOR.get(), + PREEMPTION_DELAY.get(), + singleThreadLoggingScheduledExecutor("PreemptorProcessor-%d", LOG)); } @Override @@ -72,7 +85,7 @@ public class PreemptorModule extends AbstractModule { LOG.info("Preemptor Enabled."); bind(ScheduledExecutorService.class) .annotatedWith(PreemptorImpl.PreemptionExecutor.class) - .toInstance(singleThreadLoggingScheduledExecutor("PreemptorProcessor-%d", LOG)); + .toInstance(executor); bind(PreemptorMetrics.class).in(Singleton.class); bind(PreemptionSlotFinder.class).to(PreemptionSlotFinderImpl.class); bind(PreemptionSlotFinderImpl.class).in(Singleton.class); @@ -80,7 +93,7 @@ public class PreemptorModule extends AbstractModule { bind(PreemptorImpl.class).in(Singleton.class); bind(new TypeLiteral>() { }) .annotatedWith(PreemptorImpl.PreemptionDelay.class) - .toInstance(PREEMPTION_DELAY.get()); + .toInstance(preemptionDelay); bind(new TypeLiteral>() { }) .annotatedWith(PreemptionSlotCache.PreemptionSlotHoldDuration.class) .toInstance(PREEMPTION_SLOT_HOLD_TIME.get()); http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/91aec8c4/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java index c9d10e4..0e2e958 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java @@ -13,6 +13,8 @@ */ package org.apache.aurora.scheduler.async.preemptor; +import java.util.concurrent.ScheduledExecutorService; + import com.google.common.base.Optional; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; @@ -24,6 +26,8 @@ import com.google.inject.Module; import com.twitter.common.application.StartupStage; import com.twitter.common.application.modules.LifecycleModule; import com.twitter.common.base.ExceptionalCommand; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; import com.twitter.common.testing.easymock.EasyMockTest; import org.apache.aurora.scheduler.filter.AttributeAggregate; @@ -71,7 +75,10 @@ public class PreemptorModuleTest extends EasyMockTest { @Test public void testPreemptorDisabled() throws Exception { - Injector injector = createInjector(new PreemptorModule(false)); + Injector injector = createInjector(new PreemptorModule( + false, + Amount.of(0L, Time.SECONDS), + createMock(ScheduledExecutorService.class))); Supplier> taskSupplier = createMock(new EasyMockTest.Clazz>>() { });