aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject incubator-aurora git commit: Adding preemptor jmh benchmark
Date Fri, 20 Mar 2015 17:14:59 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 9473549b9 -> 91aec8c47


Adding preemptor jmh benchmark

Bugs closed: AURORA-1158

Reviewed at https://reviews.apache.org/r/32225/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/91aec8c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/91aec8c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/91aec8c4

Branch: refs/heads/master
Commit: 91aec8c472c1505282386212df40d5a12deb9672
Parents: 9473549
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Fri Mar 20 10:12:22 2015 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Fri Mar 20 10:12:22 2015 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/SchedulingBenchmarks.java  | 113 ++++++++++++++-----
 .../async/preemptor/PreemptorModule.java        |  21 +++-
 .../async/preemptor/PreemptorModuleTest.java    |   9 +-
 3 files changed, 111 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/91aec8c4/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index ad49eff..5309e81 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -19,6 +19,9 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import javax.inject.Singleton;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.eventbus.EventBus;
@@ -26,6 +29,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+import com.google.inject.PrivateModule;
 import com.google.inject.TypeLiteral;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
@@ -42,21 +46,20 @@ import org.apache.aurora.scheduler.async.OfferManager;
 import org.apache.aurora.scheduler.async.RescheduleCalculator;
 import org.apache.aurora.scheduler.async.TaskScheduler;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
-import org.apache.aurora.scheduler.async.preemptor.ClusterState;
 import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl;
-import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder;
-import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlotFinderImpl;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor;
-import org.apache.aurora.scheduler.async.preemptor.PreemptorImpl;
-import org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics;
+import org.apache.aurora.scheduler.async.preemptor.PreemptorModule;
+import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.mesos.ExecutorSettings;
 import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.mem.MemStorage;
@@ -76,7 +79,9 @@ public class SchedulingBenchmarks {
    */
   @State(Scope.Thread)
   public abstract static class AbstractBase {
-    private Storage storage;
+    protected Storage storage;
+    protected Preemptor preemptor;
+    protected ScheduledThreadPoolExecutor executor;
     private TaskScheduler taskScheduler;
     private OfferManager offerManager;
     private EventBus eventBus;
@@ -86,25 +91,25 @@ public class SchedulingBenchmarks {
      * Runs once to setup up benchmark state.
      */
     @Setup(Level.Trial)
-    public void prepare() {
+    public void setUpBenchmark() {
       storage = MemStorage.newEmptyStorage();
       eventBus = new EventBus();
       final FakeClock clock = new FakeClock();
       clock.setNowMillis(System.currentTimeMillis());
+      executor = new ScheduledThreadPoolExecutor(
+          1,
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setNameFormat("TestProcessor-%d").build());
 
       // TODO(maxim): Find a way to DRY it and reuse existing modules instead.
       Injector injector = Guice.createInjector(
           new StateModule(),
-          new AbstractModule() {
+          new PreemptorModule(true, Amount.of(0L, Time.MILLISECONDS), executor),
+          new PrivateModule() {
             @Override
             protected void configure() {
-              final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
-                  1,
-                  new ThreadFactoryBuilder()
-                      .setDaemon(true)
-                      .setNameFormat("TestProcessor-%d").build());
               bind(ScheduledExecutorService.class).toInstance(executor);
-
               bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
               bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
               bind(OfferManager.OfferReturnDelay.class).toInstance(
@@ -115,6 +120,12 @@ public class SchedulingBenchmarks {
                     }
                   });
 
+              expose(OfferManager.class);
+            }
+          },
+          new AbstractModule() {
+            @Override
+            protected void configure() {
               bind(new TypeLiteral<Amount<Long, Time>>() { })
                   .annotatedWith(ReservationDuration.class)
                   .toInstance(Amount.of(30L, Time.DAYS));
@@ -129,17 +140,6 @@ public class SchedulingBenchmarks {
                       .setThermosObserverRoot("/var/run/thermos")
                       .build());
 
-              bind(PreemptorMetrics.class).in(Singleton.class);
-              bind(PreemptionSlotFinder.class).to(PreemptionSlotFinderImpl.class);
-              bind(PreemptionSlotFinderImpl.class).in(Singleton.class);
-              bind(Preemptor.class).to(PreemptorImpl.class);
-              bind(PreemptorImpl.class).in(Singleton.class);
-              bind(new TypeLiteral<Amount<Long, Time>>() { })
-                  .annotatedWith(PreemptorImpl.PreemptionDelay.class)
-                  .toInstance(Amount.of(0L, Time.MILLISECONDS));
-              bind(ClusterState.class).to(ClusterStateImpl.class);
-              bind(ClusterStateImpl.class).in(Singleton.class);
-
               bind(Storage.class).toInstance(storage);
               bind(Driver.class).toInstance(new FakeDriver());
               bind(RescheduleCalculator.class).toInstance(new FakeRescheduleCalculator());
@@ -157,6 +157,7 @@ public class SchedulingBenchmarks {
 
       taskScheduler = injector.getInstance(TaskScheduler.class);
       offerManager = injector.getInstance(OfferManager.class);
+      preemptor = injector.getInstance(Preemptor.class);
       eventBus.register(injector.getInstance(ClusterStateImpl.class));
 
       settings = getSettings();
@@ -169,6 +170,13 @@ public class SchedulingBenchmarks {
       saveTasks(ImmutableSet.of(settings.getTask()));
     }
 
+    @Setup(Level.Iteration)
+    public void setUpIteration() {
+      // Clear executor queue between iterations. Otherwise, executor tasks keep piling up
and
+      // affect benchmark performance due to memory pressure and excessive GC.
+      executor.getQueue().clear();
+    }
+
     private Set<IScheduledTask> buildClusterTasks(int numOffers) {
       int numOffersToFill = (int) Math.round(numOffers * settings.getClusterUtilization());
       return new Tasks.Builder()
@@ -270,9 +278,9 @@ public class SchedulingBenchmarks {
 
   /**
    * Tests scheduling performance with a large number of tasks and slaves where the cluster
-   * is completely filled up and preemptor is invoked for all slaves in the cluster.
+   * is completely filled up.
    */
-  public static class PreemptorFallbackForLargeClusterBenchmark extends AbstractBase {
+  public static class ClusterFullUtilizationBenchmark extends AbstractBase {
     @Override
     protected BenchmarkSettings getSettings() {
       return new BenchmarkSettings.Builder()
@@ -286,4 +294,55 @@ public class SchedulingBenchmarks {
               .build(1))).build();
     }
   }
+
+  /**
+   * Tests preemptor searching for a preemption slot in a completely filled up cluster.
+   */
+  public static class PreemptorSlotSearchBenchmark extends AbstractBase {
+
+    @Override
+    protected BenchmarkSettings getSettings() {
+      return new BenchmarkSettings.Builder()
+          .setClusterUtilization(1.0)
+          .setHostAttributes(new Hosts.Builder().setNumHostsPerRack(2).build(1000))
+          .setTask(Iterables.getOnlyElement(new Tasks.Builder()
+              .setProduction(true)
+              .addValueConstraint("host", "denied")
+              .setTaskIdFormat("test-%s")
+              .build(1))).build();
+    }
+
+    @Override
+    public boolean runBenchmark() {
+      return storage.write(new Storage.MutateWork.Quiet<Boolean>() {
+        @Override
+        public Boolean apply(final Storage.MutableStoreProvider storeProvider) {
+          IAssignedTask assignedTask = getSettings().getTask().getAssignedTask();
+          final Query.Builder query = Query.jobScoped(assignedTask.getTask().getJob())
+              .byStatus(org.apache.aurora.scheduler.base.Tasks.SLAVE_ASSIGNED_STATES);
+
+          Supplier<ImmutableSet<IScheduledTask>> taskSupplier = Suppliers.memoize(
+              new Supplier<ImmutableSet<IScheduledTask>>() {
+                @Override
+                public ImmutableSet<IScheduledTask> get() {
+                  return storeProvider.getTaskStore().fetchTasks(query);
+                }
+              });
+
+          AttributeAggregate aggregate =
+              new AttributeAggregate(taskSupplier, storeProvider.getAttributeStore());
+
+          Optional<String> result =
+              preemptor.attemptPreemptionFor(assignedTask.getTaskId(), aggregate);
+
+          while (executor.getActiveCount() > 0) {
+            // Using a tight loop to wait for a search completion. This is executed on a
benchmark
+            // main thread and does not affect test results.
+          }
+
+          return result.isPresent();
+        }
+      });
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/91aec8c4/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
index f817ccd..7034a07 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
@@ -32,6 +32,8 @@ import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.Preempti
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 
+import static java.util.Objects.requireNonNull;
+
 import static org.apache.aurora.scheduler.base.AsyncUtil.singleThreadLoggingScheduledExecutor;
 
 public class PreemptorModule extends AbstractModule {
@@ -53,14 +55,25 @@ public class PreemptorModule extends AbstractModule {
       Arg.create(Amount.of(3L, Time.MINUTES));
 
   private final boolean enablePreemptor;
+  private final Amount<Long, Time> preemptionDelay;
+  private final ScheduledExecutorService executor;
 
   @VisibleForTesting
-  PreemptorModule(boolean enablePreemptor) {
+  public PreemptorModule(
+      boolean enablePreemptor,
+      Amount<Long, Time> preemptionDelay,
+      ScheduledExecutorService executor) {
+
     this.enablePreemptor = enablePreemptor;
+    this.preemptionDelay = requireNonNull(preemptionDelay);
+    this.executor = requireNonNull(executor);
   }
 
   public PreemptorModule() {
-    this(ENABLE_PREEMPTOR.get());
+    this(
+        ENABLE_PREEMPTOR.get(),
+        PREEMPTION_DELAY.get(),
+        singleThreadLoggingScheduledExecutor("PreemptorProcessor-%d", LOG));
   }
 
   @Override
@@ -72,7 +85,7 @@ public class PreemptorModule extends AbstractModule {
           LOG.info("Preemptor Enabled.");
           bind(ScheduledExecutorService.class)
               .annotatedWith(PreemptorImpl.PreemptionExecutor.class)
-              .toInstance(singleThreadLoggingScheduledExecutor("PreemptorProcessor-%d", LOG));
+              .toInstance(executor);
           bind(PreemptorMetrics.class).in(Singleton.class);
           bind(PreemptionSlotFinder.class).to(PreemptionSlotFinderImpl.class);
           bind(PreemptionSlotFinderImpl.class).in(Singleton.class);
@@ -80,7 +93,7 @@ public class PreemptorModule extends AbstractModule {
           bind(PreemptorImpl.class).in(Singleton.class);
           bind(new TypeLiteral<Amount<Long, Time>>() { })
               .annotatedWith(PreemptorImpl.PreemptionDelay.class)
-              .toInstance(PREEMPTION_DELAY.get());
+              .toInstance(preemptionDelay);
           bind(new TypeLiteral<Amount<Long, Time>>() { })
               .annotatedWith(PreemptionSlotCache.PreemptionSlotHoldDuration.class)
               .toInstance(PREEMPTION_SLOT_HOLD_TIME.get());

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/91aec8c4/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
index c9d10e4..0e2e958 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
@@ -13,6 +13,8 @@
  */
 package org.apache.aurora.scheduler.async.preemptor;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
@@ -24,6 +26,8 @@ import com.google.inject.Module;
 import com.twitter.common.application.StartupStage;
 import com.twitter.common.application.modules.LifecycleModule;
 import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
@@ -71,7 +75,10 @@ public class PreemptorModuleTest extends EasyMockTest {
 
   @Test
   public void testPreemptorDisabled() throws Exception {
-    Injector injector = createInjector(new PreemptorModule(false));
+    Injector injector = createInjector(new PreemptorModule(
+        false,
+        Amount.of(0L, Time.SECONDS),
+        createMock(ScheduledExecutorService.class)));
 
     Supplier<ImmutableSet<IScheduledTask>> taskSupplier =
         createMock(new EasyMockTest.Clazz<Supplier<ImmutableSet<IScheduledTask>>>()
{ });


Mime
View raw message