aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject incubator-aurora git commit: Implemented TaskScheduler performance benchmarks.
Date Mon, 02 Feb 2015 18:16:23 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 9233abd08 -> f22b538fb


Implemented TaskScheduler performance benchmarks.

Bugs closed: AURORA-969

Reviewed at https://reviews.apache.org/r/28731/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/f22b538f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/f22b538f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/f22b538f

Branch: refs/heads/master
Commit: f22b538fbfc8f2b7132e4c64ecb08d9030fea578
Parents: 9233abd
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Mon Feb 2 10:16:09 2015 -0800
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Mon Feb 2 10:16:09 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |   6 +-
 .../java/org/apache/aurora/benchmark/Hosts.java |  66 ++++
 .../org/apache/aurora/benchmark/Offers.java     | 104 +++++++
 .../aurora/benchmark/SchedulerBenchmark.java    |  24 --
 .../aurora/benchmark/SchedulingBenchmarks.java  | 311 +++++++++++++++++++
 .../java/org/apache/aurora/benchmark/Tasks.java | 145 +++++++++
 .../aurora/benchmark/fakes/FakeDriver.java      |  51 +++
 .../aurora/benchmark/fakes/FakeEventSink.java   |  24 ++
 .../fakes/FakeRescheduleCalculator.java         |  29 ++
 .../benchmark/fakes/FakeStatsProvider.java      |  73 +++++
 .../aurora/scheduler/async/TaskScheduler.java   |   3 +-
 .../async/preemptor/CachedClusterState.java     |   2 +-
 .../scheduler/async/preemptor/ClusterState.java |   4 +-
 .../async/preemptor/PreemptorImpl.java          |   9 +-
 14 files changed, 819 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index f9f71a8..d122d2f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -445,9 +445,11 @@ jmh {
   resultsFile = project.file("$buildDir/reports/jmh/results.txt")
 
   // JMH run configuration parameters.
-  iterations = 3
+  iterations = 100
   fork = 1
-  warmupIterations = 1
+  warmupIterations = 10
+  benchmarkMode = 'avgt'
+  timeUnit = 'ns'
 }
 tasks.getByName('jmh').doLast() {
   println "Benchmark report generated: file://$jmhHumanOutputPath"

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/src/jmh/java/org/apache/aurora/benchmark/Hosts.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/Hosts.java b/src/jmh/java/org/apache/aurora/benchmark/Hosts.java
new file mode 100644
index 0000000..d95e2dd
--- /dev/null
+++ b/src/jmh/java/org/apache/aurora/benchmark/Hosts.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.benchmark;
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+
+/**
+ * Host attribute builder.
+ */
+final class Hosts {
+  private Hosts() {
+    // Utility class.
+  }
+
+  /**
+   * Builds host attributes for the specified configuration.
+   */
+  static final class Builder {
+    private static final String SLAVE_ID_FORMAT = "slave-%s";
+    private static final String HOST_NAME_FORMAT = "host-%s";
+    private static final String RACK_NAME_FORMAT = "rack-%s";
+    private int hostsPerRack = 1;
+
+    Builder setNumHostsPerRack(int newHostsPerRack) {
+      hostsPerRack = newHostsPerRack;
+      return this;
+    }
+
+    Set<IHostAttributes> build(int count) {
+      ImmutableSet.Builder<IHostAttributes> attributes = ImmutableSet.builder();
+      int rackIndex = 0;
+      for (int i = 0; i < count; i++) {
+        attributes.add(IHostAttributes.build(new HostAttributes()
+            .setHost(String.format(HOST_NAME_FORMAT, i))
+            .setSlaveId(String.format(SLAVE_ID_FORMAT, i))
+            .setMode(MaintenanceMode.NONE)
+            .setAttributes(ImmutableSet.of(
+                new Attribute("rack", ImmutableSet.of(String.format(RACK_NAME_FORMAT, rackIndex))),
+                new Attribute("host", ImmutableSet.of(String.format(HOST_NAME_FORMAT, i)))))));
+
+        if (i % hostsPerRack == 0) {
+          rackIndex++;
+        }
+      }
+      return attributes.build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/src/jmh/java/org/apache/aurora/benchmark/Offers.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/Offers.java b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
new file mode 100644
index 0000000..55bc2f7
--- /dev/null
+++ b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.benchmark;
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.OfferQueue;
+import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.mesos.Protos;
+
+/**
+ * Offer factory.
+ */
+final class Offers {
+  private Offers() {
+    // Utility class.
+  }
+
+  /**
+   * Saves offers into the {@link OfferQueue}.
+   *
+   * @param offerQueue {@link OfferQueue} to save into.
+   * @param offers Offers to save.
+   */
+  static void addOffers(OfferQueue offerQueue, Iterable<HostOffer> offers) {
+    for (HostOffer offer : offers) {
+      offerQueue.addOffer(offer);
+    }
+  }
+
+  /**
+   * Builds offers for the specified configuration.
+   */
+  static final class Builder {
+    private static final String OFFER_ID_FORMAT = "offer-%s";
+    private static final String FRAMEWORK_ID = "framework_id";
+
+    private double cpu = 8.0;
+    private Amount<Long, Data> ram = Amount.of(16L, Data.GB);
+    private Amount<Long, Data> disk = Amount.of(256L, Data.GB);
+    private int ports = 1024;
+
+    Builder setCpu(double newCpu) {
+      cpu = newCpu;
+      return this;
+    }
+
+    Builder setRam(Amount<Long, Data> newRam) {
+      ram = newRam;
+      return this;
+    }
+
+    Builder setDisk(Amount<Long, Data> newDisk) {
+      disk = newDisk;
+      return this;
+    }
+
+    Builder setPorts(int newPorts) {
+      ports = newPorts;
+      return this;
+    }
+
+    /**
+     * Builds a set of {@link HostOffer} for the current configuration.
+     *
+     * @param hostAttributes Host attributes to initialize offers from.
+     * @return Set of offers.
+     */
+    Set<HostOffer> build(Set<IHostAttributes> hostAttributes) {
+      ImmutableSet.Builder<HostOffer> offers = ImmutableSet.builder();
+      int id = 0;
+      for (IHostAttributes attributes : hostAttributes) {
+        Protos.Offer offer = Protos.Offer.newBuilder()
+            .addAllResources(new Resources(cpu, ram, disk, ports).toResourceList())
+            .setId(Protos.OfferID.newBuilder().setValue(String.format(OFFER_ID_FORMAT, id++)))
+            .setFrameworkId(Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID))
+            .setSlaveId(Protos.SlaveID.newBuilder().setValue(attributes.getSlaveId()))
+            .setHostname(String.format(attributes.getHost()))
+            .build();
+
+        offers.add(new HostOffer(offer, attributes));
+      }
+
+      return offers.build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/src/jmh/java/org/apache/aurora/benchmark/SchedulerBenchmark.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulerBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulerBenchmark.java
deleted file mode 100644
index 5cecada..0000000
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulerBenchmark.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.benchmark;
-
-import org.openjdk.jmh.annotations.Benchmark;
-
-public class SchedulerBenchmark {
-
-  @Benchmark
-  public void example() {
-    // TODO(maxim): implement benchmark.
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
new file mode 100644
index 0000000..8c11ef8
--- /dev/null
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.benchmark;
+
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import javax.inject.Singleton;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.TypeLiteral;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.benchmark.fakes.FakeDriver;
+import org.apache.aurora.benchmark.fakes.FakeRescheduleCalculator;
+import org.apache.aurora.benchmark.fakes.FakeStatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.async.OfferQueue;
+import org.apache.aurora.scheduler.async.RescheduleCalculator;
+import org.apache.aurora.scheduler.async.TaskScheduler;
+import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
+import org.apache.aurora.scheduler.async.preemptor.CachedClusterState;
+import org.apache.aurora.scheduler.async.preemptor.ClusterState;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor;
+import org.apache.aurora.scheduler.async.preemptor.PreemptorImpl;
+import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.mesos.ExecutorSettings;
+import org.apache.aurora.scheduler.state.StateModule;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.mem.MemStorage;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+/**
+ * Performance benchmarks for the task scheduling loop.
+ */
+public class SchedulingBenchmarks {
+
+  /**
+   * Constructs scheduler objects and populates offers/tasks for the benchmark run.
+   */
+  @State(Scope.Thread)
+  public abstract static class AbstractBase {
+    protected Storage storage;
+    protected TaskScheduler taskScheduler;
+    protected OfferQueue offerQueue;
+    protected IScheduledTask task;
+    protected Set<HostOffer> offers;
+    protected Set<IHostAttributes> hostAttributes;
+    protected EventBus eventBus;
+
+    /**
+     * Runs once to setup up benchmark state.
+     */
+    @Setup(Level.Trial)
+    public void prepare() {
+      storage = MemStorage.newEmptyStorage();
+      eventBus = new EventBus();
+      final FakeClock clock = new FakeClock();
+      clock.setNowMillis(System.currentTimeMillis());
+
+      Injector injector = Guice.createInjector(
+          new StateModule(),
+          new AbstractModule() {
+            @Override
+            protected void configure() {
+              final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
+                  1,
+                  new ThreadFactoryBuilder()
+                      .setDaemon(true)
+                      .setNameFormat("TestProcessor-%d").build());
+              bind(ScheduledExecutorService.class).toInstance(executor);
+
+              bind(OfferQueue.class).to(OfferQueue.OfferQueueImpl.class);
+              bind(OfferQueue.OfferQueueImpl.class).in(Singleton.class);
+              bind(OfferQueue.OfferReturnDelay.class).toInstance(
+                  new OfferQueue.OfferReturnDelay() {
+                    @Override
+                    public Amount<Long, Time> get() {
+                      return Amount.of(30L, Time.DAYS);
+                    }
+                  });
+
+              bind(new TypeLiteral<Amount<Long, Time>>() { })
+                  .annotatedWith(ReservationDuration.class)
+                  .toInstance(Amount.of(30L, Time.DAYS));
+              bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
+              bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class);
+              bind(TaskIdGenerator.class).to(TaskIdGenerator.TaskIdGeneratorImpl.class);
+              bind(SchedulingFilter.class).to(SchedulingFilterImpl.class);
+              bind(SchedulingFilterImpl.class).in(Singleton.class);
+              bind(ExecutorSettings.class)
+                  .toInstance(new ExecutorSettings(
+                      "/executor/thermos",
+                      ImmutableList.<String>of(),
+                      "/var/run/thermos",
+                      Optional.<String>absent(),
+                      new Resources(0.0, Amount.of(0L, Data.MB), Amount.of(0L, Data.MB),
0)));
+
+              bind(Preemptor.class).to(PreemptorImpl.class);
+              bind(PreemptorImpl.class).in(Singleton.class);
+              bind(new TypeLiteral<Amount<Long, Time>>() { })
+                  .annotatedWith(PreemptorImpl.PreemptionDelay.class)
+                  .toInstance(Amount.of(0L, Time.MILLISECONDS));
+              bind(ClusterState.class).to(CachedClusterState.class);
+              bind(CachedClusterState.class).in(Singleton.class);
+
+              bind(Storage.class).toInstance(storage);
+              bind(Driver.class).toInstance(new FakeDriver());
+              bind(RescheduleCalculator.class).toInstance(new FakeRescheduleCalculator());
+              bind(Clock.class).toInstance(clock);
+              bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+              bind(EventSink.class).toInstance(new EventSink() {
+                @Override
+                public void post(PubsubEvent event) {
+                  eventBus.post(event);
+                }
+              });
+            }
+          }
+      );
+
+      taskScheduler = injector.getInstance(TaskScheduler.class);
+      offerQueue = injector.getInstance(OfferQueue.class);
+      eventBus.register(injector.getInstance(CachedClusterState.class));
+
+      hostAttributes = createHostAttributes();
+      saveHostAttributes(hostAttributes);
+
+      offers = createOffers(hostAttributes);
+      Offers.addOffers(offerQueue, offers);
+      fillUpCluster();
+
+      task = createTask();
+      saveTasks(ImmutableSet.of(task));
+    }
+
+    private void fillUpCluster() {
+      int numOffersToFill = (int) Math.round(offers.size() * getClusterUtilization());
+      Set<IScheduledTask> tasksToAssign = new Tasks.Builder().build(numOffersToFill);
+      saveTasks(tasksToAssign);
+      for (IScheduledTask scheduledTask : tasksToAssign) {
+        taskScheduler.schedule(scheduledTask.getAssignedTask().getTaskId());
+      }
+    }
+
+    private void saveTasks(final Set<IScheduledTask> tasks) {
+      storage.write(new Storage.MutateWork.NoResult.Quiet() {
+        @Override
+        protected void execute(Storage.MutableStoreProvider storeProvider) {
+          storeProvider.getUnsafeTaskStore().saveTasks(tasks);
+        }
+      });
+    }
+
+    private void saveHostAttributes(final Set<IHostAttributes> hostAttributesToSave)
{
+      storage.write(new Storage.MutateWork.NoResult.Quiet() {
+        @Override
+        protected void execute(Storage.MutableStoreProvider storeProvider) {
+          for (IHostAttributes attributes : hostAttributesToSave) {
+            storeProvider.getAttributeStore().saveHostAttributes(attributes);
+          }
+        }
+      });
+    }
+
+    protected abstract double getClusterUtilization();
+
+    protected abstract Set<IHostAttributes> createHostAttributes();
+
+    protected abstract Set<HostOffer> createOffers(Set<IHostAttributes> attributes);
+
+    protected abstract IScheduledTask createTask();
+
+    /**
+     * Benchmark entry point. All settings (e.g. iterations, benchmarkMode and etc.) are
defined
+     * in build.gradle.
+     *
+     * @return A "blackhole" to make sure the result is not optimized out.
+     * See {@see http://openjdk.java.net/projects/code-tools/jmh/} for more info.
+     */
+    @Benchmark
+    public boolean runBenchmark() {
+      return taskScheduler.schedule(task.getAssignedTask().getTaskId());
+    }
+  }
+
+  /**
+   * Tests scheduling performance with a task vetoed due to insufficient CPU.
+   */
+  public static class InsufficientResourcesSchedulingBenchmark extends AbstractBase {
+    @Override
+    protected Set<IHostAttributes> createHostAttributes() {
+      return new Hosts.Builder().setNumHostsPerRack(2).build(1000);
+    }
+
+    @Override
+    protected Set<HostOffer> createOffers(Set<IHostAttributes> hostAttributes)
{
+      return new Offers.Builder().build(hostAttributes);
+    }
+
+    @Override
+    protected double getClusterUtilization() {
+      return 0.9;
+    }
+
+    @Override
+    protected IScheduledTask createTask() {
+      return Iterables.getOnlyElement(new Tasks.Builder()
+          .setProduction(true)
+          .setCpu(32)
+          .setTaskIdFormat("test-%s")
+          .build(1));
+    }
+  }
+
+  /**
+   * Tests scheduling performance with a task vetoed due to constraint mismatch.
+   */
+  public static class ConstraintMismatchsSchedulingBenchmark extends AbstractBase {
+    @Override
+    protected Set<IHostAttributes> createHostAttributes() {
+      return new Hosts.Builder().setNumHostsPerRack(2).build(1000);
+    }
+
+    @Override
+    protected Set<HostOffer> createOffers(Set<IHostAttributes> hostAttributes)
{
+      return new Offers.Builder().build(hostAttributes);
+    }
+
+    @Override
+    protected double getClusterUtilization() {
+      return 0.9;
+    }
+
+    @Override
+    protected IScheduledTask createTask() {
+      return Iterables.getOnlyElement(new Tasks.Builder()
+          .setProduction(true)
+          .addValueConstraint("host", "denied")
+          .setTaskIdFormat("test-%s")
+          .build(1));
+    }
+  }
+
+  /**
+   * Tests scheduling performance with a large number of tasks and slaves where the cluster
+   * is completely filled up and preemptor is invoked for all slaves in the cluster.
+   */
+  public static class PreemptorFallbackForLargeClusterBenchmark extends AbstractBase {
+    @Override
+    protected Set<IHostAttributes> createHostAttributes() {
+      return new Hosts.Builder().setNumHostsPerRack(2).build(10000);
+    }
+
+    @Override
+    protected Set<HostOffer> createOffers(Set<IHostAttributes> hostAttributes)
{
+      return new Offers.Builder().build(hostAttributes);
+    }
+
+    @Override
+    protected double getClusterUtilization() {
+      return 1.0;
+    }
+
+    @Override
+    protected IScheduledTask createTask() {
+      return Iterables.getOnlyElement(new Tasks.Builder()
+          .setProduction(true)
+          .addValueConstraint("host", "denied")
+          .setTaskIdFormat("test-%s")
+          .build(1));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/src/jmh/java/org/apache/aurora/benchmark/Tasks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/Tasks.java b/src/jmh/java/org/apache/aurora/benchmark/Tasks.java
new file mode 100644
index 0000000..1a35f9e
--- /dev/null
+++ b/src/jmh/java/org/apache/aurora/benchmark/Tasks.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.benchmark;
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Constraint;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.LimitConstraint;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskConstraint;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.gen.ValueConstraint;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+/**
+ * Task factory.
+ */
+final class Tasks {
+
+  private Tasks() {
+    // Utility class.
+  }
+
+  /**
+   * Builds tasks for the specified configuration.
+   */
+  static final class Builder {
+    private static final JobKey JOB_KEY = new JobKey("jmh", "dev", "benchmark");
+    private static final String USER_FORMAT = "user-%s";
+
+    private String taskIdFormat = "default_task-%s";
+    private boolean isProduction = false;
+    private double cpu = 6.0;
+    private Amount<Long, Data> ram = Amount.of(8L, Data.GB);
+    private Amount<Long, Data> disk = Amount.of(128L, Data.GB);
+    private ScheduleStatus scheduleStatus = ScheduleStatus.PENDING;
+    private ImmutableSet.Builder<Constraint> constraints = ImmutableSet.builder();
+
+    Builder setTaskIdFormat(String newTaskIdFormat) {
+      taskIdFormat = newTaskIdFormat;
+      return this;
+    }
+
+    Builder setCpu(double newCpu) {
+      cpu = newCpu;
+      return this;
+    }
+
+    Builder setRam(Amount<Long, Data> newRam) {
+      ram = newRam;
+      return this;
+    }
+
+    Builder setDisk(Amount<Long, Data> newDisk) {
+      disk = newDisk;
+      return this;
+    }
+
+    Builder setScheduleStatus(ScheduleStatus newScheduleStatus) {
+      scheduleStatus = newScheduleStatus;
+      return this;
+    }
+
+    Builder setProduction(boolean newProduction) {
+      isProduction = newProduction;
+      return this;
+    }
+
+    Builder addValueConstraint(String name, String value) {
+      constraints.add(new Constraint()
+          .setName(name)
+          .setConstraint(TaskConstraint.value(new ValueConstraint()
+              .setNegated(false)
+              .setValues(ImmutableSet.of(value)))));
+
+      return this;
+    }
+
+    Builder addLimitConstraint(String name, int limit) {
+      constraints.add(new Constraint()
+          .setName(name)
+          .setConstraint(TaskConstraint.limit(new LimitConstraint()
+              .setLimit(limit))));
+
+      return this;
+    }
+
+    /**
+     * Builds a set of {@link IScheduledTask} for the current configuration.
+     *
+     * @param count Number of tasks to build.
+     * @return Set of tasks.
+     */
+    Set<IScheduledTask> build(int count) {
+      ImmutableSet.Builder<IScheduledTask> tasks = ImmutableSet.builder();
+
+      for (int i = 0; i < count; i++) {
+        String taskId = String.format(taskIdFormat, i);
+
+        tasks.add(IScheduledTask.build(new ScheduledTask()
+            .setTaskEvents(Lists.newArrayList(new TaskEvent(0, ScheduleStatus.PENDING)))
+            .setStatus(scheduleStatus)
+            .setAssignedTask(new AssignedTask()
+                .setInstanceId(i)
+                .setTaskId(taskId)
+                .setTask(new TaskConfig()
+                    .setConstraints(constraints.build())
+                    .setNumCpus(cpu)
+                    .setRamMb(ram.as(Data.MB))
+                    .setDiskMb(disk.as(Data.MB))
+                    .setProduction(isProduction)
+                    .setRequestedPorts(ImmutableSet.<String>of())
+                    .setJob(JOB_KEY)
+                    .setJobName(JOB_KEY.getName())
+                    .setEnvironment(JOB_KEY.getEnvironment())
+                    .setOwner(new Identity()
+                        .setRole(JOB_KEY.getRole())
+                        .setUser(String.format(USER_FORMAT, taskId)))))));
+      }
+
+      return tasks.build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
new file mode 100644
index 0000000..45de15a
--- /dev/null
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.benchmark.fakes;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.mesos.Protos;
+
+public class FakeDriver extends AbstractIdleService implements Driver {
+  @Override
+  public void blockUntilStopped() {
+    // no-op
+  }
+
+  @Override
+  public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task) {
+    // no-op
+  }
+
+  @Override
+  public void declineOffer(Protos.OfferID offerId) {
+    // no-op
+  }
+
+  @Override
+  public void killTask(String taskId) {
+    // no-op
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    // no-op
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    // no-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeEventSink.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeEventSink.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeEventSink.java
new file mode 100644
index 0000000..db37f1f
--- /dev/null
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeEventSink.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.benchmark.fakes;
+
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+
+public class FakeEventSink implements EventSink {
+  @Override
+  public void post(PubsubEvent event) {
+    // no-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java
b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java
new file mode 100644
index 0000000..6d71012
--- /dev/null
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.benchmark.fakes;
+
+import org.apache.aurora.scheduler.async.RescheduleCalculator;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+public class FakeRescheduleCalculator implements RescheduleCalculator {
+  @Override
+  public long getStartupScheduleDelayMs(IScheduledTask task) {
+    return 0;
+  }
+
+  @Override
+  public long getFlappingPenaltyMs(IScheduledTask task) {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeStatsProvider.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeStatsProvider.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeStatsProvider.java
new file mode 100644
index 0000000..3413b0a
--- /dev/null
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeStatsProvider.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.benchmark.fakes;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Supplier;
+
+import com.twitter.common.stats.Stat;
+import com.twitter.common.stats.StatsProvider;
+
+public class FakeStatsProvider implements StatsProvider {
+  @Override
+  public AtomicLong makeCounter(String name) {
+    return new AtomicLong();
+  }
+
+  @Override
+  public <T extends Number> Stat<T> makeGauge(final String name, final Supplier<T>
gauge) {
+    return new Stat<T>() {
+      @Override
+      public String getName() {
+        return name;
+      }
+
+      @Override
+      public T read() {
+        return gauge.get();
+      }
+    };
+  }
+
+  @Override
+  public StatsProvider untracked() {
+    return this;
+  }
+
+  @Override
+  public RequestTimer makeRequestTimer(String name) {
+    return new RequestTimer() {
+      @Override
+      public void requestComplete(long latencyMicros) {
+        // no-op
+      }
+
+      @Override
+      public void incErrors() {
+        // no-op
+      }
+
+      @Override
+      public void incReconnects() {
+        // no-op
+      }
+
+      @Override
+      public void incTimeouts() {
+        // no-op
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index b6402ae..ce47ff1 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -94,9 +94,10 @@ public interface TaskScheduler extends EventSubscriber {
     /**
      * Binding annotation for the time duration of reservations.
      */
+    @VisibleForTesting
     @Qualifier
     @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-    @interface ReservationDuration { }
+    public @interface ReservationDuration { }
 
     private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName());
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
index 03c2a8f..2831103 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
@@ -25,7 +25,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 /**
  * A cached view of cluster state, kept up to date by pubsub notifications.
  */
-class CachedClusterState implements ClusterState, PubsubEvent.EventSubscriber {
+public class CachedClusterState implements ClusterState, PubsubEvent.EventSubscriber {
 
   private final Multimap<String, PreemptionVictim> victims =
       Multimaps.synchronizedMultimap(HashMultimap.<String, PreemptionVictim>create());

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
index f7e157c..38610b2 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
@@ -13,12 +13,14 @@
  */
 package org.apache.aurora.scheduler.async.preemptor;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Multimap;
 
 /**
  * A facade for the preemptor to gain access to the state of scheduled tasks in the cluster.
  */
-interface ClusterState {
+@VisibleForTesting
+public interface ClusterState {
 
   /**
    * Gets a snapshot of the active tasks in the cluster, indexed by the slave IDs they are

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f22b538f/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
index 10c4f06..6e89075 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
@@ -73,15 +74,17 @@ import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
  * To avoid excessive churn, the preemptor requires that a task is PENDING for a duration
  * (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible to preempt
other
  */
-class PreemptorImpl implements Preemptor {
+@VisibleForTesting
+public class PreemptorImpl implements Preemptor {
 
   /**
    * Binding annotation for the time interval after which a pending task becomes eligible
to
    * preempt other tasks.
    */
+  @VisibleForTesting
   @Qualifier
   @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  @interface PreemptionDelay { }
+  public @interface PreemptionDelay { }
 
   private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
   // Incremented every time the preemptor is invoked and finds tasks pending and preemptable
tasks
@@ -119,7 +122,7 @@ class PreemptorImpl implements Preemptor {
    * @param clock Clock to check current time.
    */
   @Inject
-  public PreemptorImpl(
+  PreemptorImpl(
       Storage storage,
       StateManager stateManager,
       OfferQueue offerQueue,


Mime
View raw message