aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject aurora git commit: Added a status update throughput benchmark.
Date Tue, 05 May 2015 22:42:45 GMT
Repository: aurora
Updated Branches:
  refs/heads/master d3b6e13df -> 751d65f15


Added a status update throughput benchmark.

Bugs closed: AURORA-1283

Reviewed at https://reviews.apache.org/r/33608/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/751d65f1
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/751d65f1
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/751d65f1

Branch: refs/heads/master
Commit: 751d65f15068954e9b8774890920cd32a8b471b2
Parents: d3b6e13
Author: Ben Mahler <benjamin.mahler@gmail.com>
Authored: Tue May 5 15:37:50 2015 -0700
Committer: Bill Farner <wfarner@apache.org>
Committed: Tue May 5 15:37:50 2015 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/StatusUpdateBenchmark.java | 299 +++++++++++++++++++
 .../benchmark/fakes/FakeOfferManager.java       |  58 ++++
 .../benchmark/fakes/FakeSchedulerDriver.java    | 130 ++++++++
 .../aurora/scheduler/UserTaskLauncher.java      |   3 +-
 .../scheduler/mesos/SchedulerDriverModule.java  |   1 +
 5 files changed, 490 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/751d65f1/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
new file mode 100644
index 0000000..7bb64dd
--- /dev/null
+++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.benchmark;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Singleton;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Provides;
+
+import com.twitter.common.application.ShutdownStage;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.benchmark.fakes.FakeOfferManager;
+import org.apache.aurora.benchmark.fakes.FakeRescheduleCalculator;
+import org.apache.aurora.benchmark.fakes.FakeSchedulerDriver;
+import org.apache.aurora.benchmark.fakes.FakeStatsProvider;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.TaskLauncher;
+import org.apache.aurora.scheduler.UserTaskLauncher;
+import org.apache.aurora.scheduler.async.OfferManager;
+import org.apache.aurora.scheduler.async.RescheduleCalculator;
+import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
+import org.apache.aurora.scheduler.mesos.DriverFactory;
+import org.apache.aurora.scheduler.mesos.DriverSettings;
+import org.apache.aurora.scheduler.mesos.ExecutorSettings;
+import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
+import org.apache.aurora.scheduler.state.StateModule;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.db.DbUtil;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Performance benchmarks for status update processing throughput. Note that we
+ * need to send many updates and wait for all transitions to occur within one run
+ * of the benchmark. This is because we don't want to assume that status updates
+ * are processed synchronously.
+ */
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
+@Fork(1)
+@Threads(1)
+@State(Scope.Thread)
+public class StatusUpdateBenchmark {
+  /**
+   * Simulates a slow storage backend by introducing latency on top of an
+   * underlying storage implementation.
+   *
+   * TODO(bmahler): Consider specifying read and write latency separately.
+   */
+  private static final class SlowStorageWrapper implements Storage {
+    private final Storage underlyingStorage;
+    private Optional<Amount<Long, Time>> latency = Optional.absent();
+
+    private SlowStorageWrapper(Storage underlyingStorage) {
+      this.underlyingStorage = requireNonNull(underlyingStorage);
+    }
+
+    private void setLatency(Amount<Long, Time> latency) {
+      this.latency = Optional.of(latency);
+    }
+
+    private void maybeSleep() {
+      if (latency.isPresent()) {
+        Uninterruptibles.sleepUninterruptibly(
+            latency.get().getValue(),
+            latency.get().getUnit().getTimeUnit());
+      }
+    }
+
+    @Override
+    public <T, E extends Exception> T read(Work<T, E> work) throws StorageException,
E {
+      maybeSleep();
+      return underlyingStorage.read(work);
+    }
+
+    @Override
+    public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException,
E {
+      maybeSleep();
+      return underlyingStorage.write(work);
+    }
+
+    @Override
+    public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
+        throws StorageException, E {
+
+      maybeSleep();
+      underlyingStorage.bulkLoad(work);
+    }
+
+    @Override
+    public void prepare() throws StorageException {
+      underlyingStorage.prepare();
+    }
+  }
+
+  // Benchmark with 1000 tasks to easily observe the kilo-qps of status
+  // update processing. Consider varying this number if needed.
+  private static final int NUM_TASKS = 1000;
+
+  // Vary the storage latency to observe the effect on throughput.
+  @Param({"5", "25", "100"})
+  private long latencyMilliseconds;
+
+  private SchedulerDriver driver;
+  private Scheduler scheduler;
+  private SlowStorageWrapper storage;
+  private EventBus eventBus;
+  private Set<IScheduledTask> tasks;
+  private CountDownLatch countDownLatch;
+
+  /**
+   * Run once per trial to set up the benchmark.
+   */
+  @Setup(Level.Trial)
+  public void setUpBenchmark() {
+    eventBus = new EventBus();
+    storage = new SlowStorageWrapper(DbUtil.createStorage());
+
+    Injector injector = Guice.createInjector(
+        new StateModule(),
+        new SchedulerDriverModule(),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(DriverFactory.class).toInstance(new DriverFactory() {
+              @Override
+              public SchedulerDriver create(
+                  Scheduler s,
+                  Optional<Protos.Credential> credentials,
+                  Protos.FrameworkInfo frameworkInfo,
+                  String master) {
+
+                return new FakeSchedulerDriver();
+              }
+            });
+            bind(OfferManager.class).toInstance(new FakeOfferManager());
+            bind(TaskIdGenerator.class).to(TaskIdGenerator.TaskIdGeneratorImpl.class);
+            bind(SchedulingFilter.class).to(SchedulingFilterImpl.class);
+            bind(Command.class).annotatedWith(ShutdownStage.class).toInstance(
+                new Command() {
+                  @Override
+                  public void execute() throws RuntimeException {
+                    // no-op
+                  }
+                });
+            bind(Thread.UncaughtExceptionHandler.class).toInstance(
+                new Thread.UncaughtExceptionHandler() {
+                  @Override
+                  public void uncaughtException(Thread t, Throwable e) {
+                    // no-op
+                  }
+                });
+            bind(Storage.class).toInstance(storage);
+            bind(DriverSettings.class).toInstance(
+                new DriverSettings(
+                    "fakemaster",
+                    Optional.<Protos.Credential>absent(),
+                    Protos.FrameworkInfo.newBuilder()
+                        .setUser("framework user")
+                        .setName("test framework")
+                        .build()));
+            bind(RescheduleCalculator.class).toInstance(new FakeRescheduleCalculator());
+            bind(Clock.class).toInstance(new FakeClock());
+            bind(ExecutorSettings.class)
+                .toInstance(ExecutorSettings.newBuilder()
+                    .setExecutorPath("/executor/thermos")
+                    .setThermosObserverRoot("/var/run/thermos")
+                    .build());
+            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+            bind(EventSink.class).toInstance(new EventSink() {
+              @Override
+              public void post(PubsubEvent event) {
+                eventBus.post(event);
+              }
+            });
+          }
+
+          @Provides
+          @Singleton
+          List<TaskLauncher> provideTaskLaunchers(
+              UserTaskLauncher userTaskLauncher) {
+            return ImmutableList.<TaskLauncher>of(userTaskLauncher);
+          }
+        }
+    );
+
+    eventBus.register(injector.getInstance(ClusterStateImpl.class));
+    scheduler = injector.getInstance(Scheduler.class);
+    eventBus.register(this);
+  }
+
+  /**
+   * Runs before each iteration of the benchmark in order to vary the storage
+   * latency across iterations, based on the latency parameter.
+   */
+  @Setup(Level.Iteration)
+  public void setIterationLatency() {
+    storage.setLatency(Amount.of(latencyMilliseconds, Time.MILLISECONDS));
+  }
+
+  /**
+   * Runs before each invocation of the benchmark in order to store the tasks
+   * that we will transition in the benchmark.
+   */
+  @Setup(Level.Invocation)
+  public void createTasks() {
+    tasks = new Tasks.Builder()
+        .setScheduleStatus(ScheduleStatus.STARTING)
+        .build(NUM_TASKS);
+
+    storage.write(new Storage.MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        storeProvider.getUnsafeTaskStore().saveTasks(tasks);
+      }
+    });
+
+    countDownLatch = new CountDownLatch(tasks.size());
+  }
+
+  @Subscribe
+  public void taskChangedState(PubsubEvent.TaskStateChange stateChange) {
+    countDownLatch.countDown();
+  }
+
+  @Benchmark
+  public boolean runBenchmark() throws InterruptedException {
+    for (String taskId : org.apache.aurora.scheduler.base.Tasks.ids(tasks)) {
+      Protos.TaskStatus status = Protos.TaskStatus.newBuilder()
+          .setState(Protos.TaskState.TASK_RUNNING)
+          .setSource(Protos.TaskStatus.Source.SOURCE_EXECUTOR)
+          .setMessage("message")
+          .setTimestamp(1D)
+          .setTaskId(Protos.TaskID.newBuilder().setValue(taskId).build())
+          .build();
+
+      scheduler.statusUpdate(new FakeSchedulerDriver(), status);
+    }
+
+    // Wait for all task transitions to complete.
+    countDownLatch.await();
+
+    // Return an unguessable value.
+    return System.currentTimeMillis() % 5 == 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/751d65f1/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
new file mode 100644
index 0000000..45849b5
--- /dev/null
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.benchmark.fakes;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.OfferManager;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.state.TaskAssigner;
+import org.apache.mesos.Protos;
+
+public class FakeOfferManager implements OfferManager {
+  @Override
+  public void addOffer(HostOffer offer) {
+    // no-op
+  }
+
+  @Override
+  public void cancelOffer(Protos.OfferID offer) {
+    // no-op
+  }
+
+  @Override
+  public boolean launchFirst(
+      Function<HostOffer, TaskAssigner.Assignment> acceptor,
+      TaskGroupKey groupKey) throws LaunchException {
+    return false;
+  }
+
+  @Override
+  public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) {
+    // no-op
+  }
+
+  @Override
+  public Iterable<HostOffer> getOffers() {
+    return null;
+  }
+
+  @Override
+  public Optional<HostOffer> getOffer(Protos.SlaveID slaveId) {
+    return Optional.absent();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/751d65f1/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeSchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeSchedulerDriver.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeSchedulerDriver.java
new file mode 100644
index 0000000..23c0e53
--- /dev/null
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeSchedulerDriver.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.benchmark.fakes;
+
+import java.util.Collection;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+
+public class FakeSchedulerDriver implements SchedulerDriver {
+  @Override
+  public Protos.Status start() {
+    return null;
+  }
+
+  @Override
+  public Protos.Status stop(boolean failover) {
+    return null;
+  }
+
+  @Override
+  public Protos.Status stop() {
+    return null;
+  }
+
+  @Override
+  public Protos.Status abort() {
+    return null;
+  }
+
+  @Override
+  public Protos.Status join() {
+    return null;
+  }
+
+  @Override
+  public Protos.Status run() {
+    return null;
+  }
+
+  @Override
+  public Protos.Status requestResources(
+      Collection<Protos.Request> requests) {
+    return null;
+  }
+
+  @Override
+  public Protos.Status launchTasks(
+      Collection<Protos.OfferID> offerIds,
+      Collection<Protos.TaskInfo> tasks,
+      Protos.Filters filters) {
+    return null;
+  }
+
+  @Override
+  public Protos.Status launchTasks(
+      Collection<Protos.OfferID> offerIds,
+      Collection<Protos.TaskInfo> tasks) {
+    return null;
+  }
+
+  @Override
+  public Protos.Status launchTasks(
+      Protos.OfferID offerId,
+      Collection<Protos.TaskInfo> tasks,
+      Protos.Filters filters) {
+    return null;
+  }
+
+  @Override
+  public Protos.Status launchTasks(
+      Protos.OfferID offerId,
+      Collection<Protos.TaskInfo> tasks) {
+    return null;
+  }
+
+  @Override
+  public Protos.Status killTask(
+      Protos.TaskID taskId) {
+    return null;
+  }
+
+  @Override
+  public Protos.Status declineOffer(
+      Protos.OfferID offerId,
+      Protos.Filters filters) {
+    return null;
+  }
+
+  @Override
+  public Protos.Status declineOffer(
+      Protos.OfferID offerId) {
+    return null;
+  }
+
+  @Override
+  public Protos.Status reviveOffers() {
+    return null;
+  }
+
+  @Override
+  public Protos.Status acknowledgeStatusUpdate(
+      Protos.TaskStatus status) {
+    return null;
+  }
+
+  @Override
+  public Protos.Status sendFrameworkMessage(
+      Protos.ExecutorID executorId,
+      Protos.SlaveID slaveId, byte[] data) {
+    return null;
+  }
+
+  @Override
+  public Protos.Status reconcileTasks(
+      Collection<Protos.TaskStatus> statuses) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/751d65f1/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
index c54619f..0ce9c9d 100644
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -36,7 +36,8 @@ import static java.util.Objects.requireNonNull;
 /**
  * A task launcher that matches resource offers against user tasks.
  */
-class UserTaskLauncher implements TaskLauncher {
+@VisibleForTesting
+public class UserTaskLauncher implements TaskLauncher {
 
   private static final Logger LOG = Logger.getLogger(UserTaskLauncher.class.getName());
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/751d65f1/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
index d7d659b..acf48cf 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
@@ -42,6 +42,7 @@ public class SchedulerDriverModule extends AbstractModule {
 
         bind(Scheduler.class).to(MesosSchedulerImpl.class);
         bind(MesosSchedulerImpl.class).in(Singleton.class);
+        expose(Scheduler.class);
 
         // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
         bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)


Mime
View raw message