aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kevi...@apache.org
Subject incubator-aurora git commit: Remove SchedulerActive, replace with explicit services.
Date Fri, 07 Nov 2014 04:06:52 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 5250a94da -> 2d68bc690


Remove SchedulerActive, replace with explicit services.

This is the first part of a larger change that will allow us to
make pubsub events async as it makes the dependency on subscribers
processing a SchedulerActive event explicit.

Testing Done:
./gradlew -Pq build
test_end_to_end.sh

Bugs closed: AURORA-920

Reviewed at https://reviews.apache.org/r/27716/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/2d68bc69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/2d68bc69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/2d68bc69

Branch: refs/heads/master
Commit: 2d68bc69089ae2c65a4839ed799a742bcfc3561e
Parents: 5250a94
Author: Kevin Sweeney <kevints@apache.org>
Authored: Thu Nov 6 20:06:05 2014 -0800
Committer: Kevin Sweeney <kevints@apache.org>
Committed: Thu Nov 6 20:06:05 2014 -0800

----------------------------------------------------------------------
 src/main/java/org/apache/aurora/GuavaUtils.java | 73 ++++++++++++++++
 .../aurora/scheduler/SchedulerLifecycle.java    | 92 ++++++++------------
 .../aurora/scheduler/SchedulerModule.java       | 31 +++++++
 .../org/apache/aurora/scheduler/TaskVars.java   | 13 ++-
 .../scheduler/cron/quartz/CronLifecycle.java    | 32 +------
 .../scheduler/cron/quartz/CronModule.java       |  4 +-
 .../aurora/scheduler/events/PubsubEvent.java    | 11 ---
 .../apache/aurora/scheduler/sla/SlaModule.java  | 23 +++--
 .../updater/JobUpdateEventSubscriber.java       | 13 ++-
 .../scheduler/SchedulerLifecycleTest.java       | 34 ++++----
 .../apache/aurora/scheduler/TaskVarsTest.java   |  3 +-
 .../aurora/scheduler/cron/quartz/CronIT.java    | 34 ++------
 .../aurora/scheduler/sla/SlaModuleTest.java     |  9 +-
 .../updater/JobUpdateEventSubscriberTest.java   | 11 ++-
 .../aurora/scheduler/updater/JobUpdaterIT.java  |  5 +-
 15 files changed, 213 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/main/java/org/apache/aurora/GuavaUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/GuavaUtils.java b/src/main/java/org/apache/aurora/GuavaUtils.java
new file mode 100644
index 0000000..f3fa8cd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/GuavaUtils.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.ServiceManager;
+
+/**
+ * Utilities for working with Guava.
+ */
+public final class GuavaUtils {
+  private GuavaUtils() {
+    // Utility class.
+  }
+
+  /**
+   * Interface for mocking. The Guava ServiceManager class is final.
+   */
+  public interface ServiceManagerIface {
+    ServiceManagerIface startAsync();
+
+    void awaitHealthy();
+
+    ServiceManagerIface stopAsync();
+
+    void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException;
+  }
+
+  /**
+   * Create a new {@link ServiceManagerIface} that wraps a {@link ServiceManager}.
+   *
+   * @param delegate Service manager to delegate to.
+   * @return A wrapper.
+   */
+  public static ServiceManagerIface serviceManager(final ServiceManager delegate) {
+    return new ServiceManagerIface() {
+      @Override
+      public ServiceManagerIface startAsync() {
+        delegate.startAsync();
+        return this;
+      }
+
+      @Override
+      public void awaitHealthy() {
+        delegate.awaitHealthy();
+      }
+
+      @Override
+      public ServiceManagerIface stopAsync() {
+        delegate.stopAsync();
+        return this;
+      }
+
+      @Override
+      public void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException {
+        delegate.awaitStopped(timeout, unit);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index 823ff75..453f22a 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -13,7 +13,13 @@
  */
 package org.apache.aurora.scheduler;
 
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
@@ -21,6 +27,7 @@ import java.util.logging.Logger;
 
 import javax.annotation.Nullable;
 import javax.inject.Inject;
+import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -28,6 +35,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.Atomics;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -35,7 +43,7 @@ import com.twitter.common.application.Lifecycle;
 import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.base.Closure;
 import com.twitter.common.base.Closures;
-import com.twitter.common.base.Command;
+import com.twitter.common.base.ExceptionalCommand;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.StatsProvider;
@@ -46,11 +54,11 @@ import com.twitter.common.zookeeper.Group.JoinException;
 import com.twitter.common.zookeeper.ServerSet;
 import com.twitter.common.zookeeper.SingletonService.LeaderControl;
 
+import org.apache.aurora.GuavaUtils.ServiceManagerIface;
 import org.apache.aurora.scheduler.Driver.SettableDriver;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
@@ -134,7 +142,8 @@ public class SchedulerLifecycle implements EventSubscriber {
       Clock clock,
       EventSink eventSink,
       ShutdownRegistry shutdownRegistry,
-      StatsProvider statsProvider) {
+      StatsProvider statsProvider,
+      @SchedulerActive ServiceManagerIface schedulerActiveServiceManager) {
 
     this(
         driverFactory,
@@ -145,7 +154,8 @@ public class SchedulerLifecycle implements EventSubscriber {
         clock,
         eventSink,
         shutdownRegistry,
-        statsProvider);
+        statsProvider,
+        schedulerActiveServiceManager);
   }
 
   private static final class DefaultDelayedActions implements DelayedActions {
@@ -187,11 +197,6 @@ public class SchedulerLifecycle implements EventSubscriber {
           leadingOptions.registrationDelayLimit.getValue(),
           leadingOptions.registrationDelayLimit.getUnit().getTimeUnit());
     }
-
-    @Override
-    public void onRegistered(Runnable runnable) {
-      executorService.submit(runnable);
-    }
   }
 
   @VisibleForTesting
@@ -212,7 +217,8 @@ public class SchedulerLifecycle implements EventSubscriber {
       final Clock clock,
       final EventSink eventSink,
       final ShutdownRegistry shutdownRegistry,
-      StatsProvider statsProvider) {
+      StatsProvider statsProvider,
+      final ServiceManagerIface schedulerActiveServiceManager) {
 
     requireNonNull(driverFactory);
     requireNonNull(storage);
@@ -242,10 +248,12 @@ public class SchedulerLifecycle implements EventSubscriber {
           });
     }
 
-    shutdownRegistry.addAction(new Command() {
+    shutdownRegistry.addAction(new ExceptionalCommand<TimeoutException>() {
       @Override
-      public void execute() {
+      public void execute() throws TimeoutException {
         stateMachine.transition(State.DEAD);
+        schedulerActiveServiceManager.stopAsync();
+        schedulerActiveServiceManager.awaitStopped(5L, TimeUnit.SECONDS);
       }
     });
 
@@ -321,47 +329,14 @@ public class SchedulerLifecycle implements EventSubscriber {
           }
         });
 
-        // This action sequence must be deferred due to a subtle detail of how guava's EventBus
-        // works. EventBus event handlers are guaranteed to not be reentrant, meaning that
posting
-        // an event from an event handler will not dispatch in the same sequence as the calls
to
-        // post().
-        // In short, this is to enforce a happens-before relationship between delivering
-        // SchedulerActive and advertising leadership. Without deferring, you end up with
a call
-        // sequence like this:
-        //
-        // - Enter DriverRegistered handler
-        //   - Post SchedulerActive event
-        //   - Announce leadership
-        // - Exit DriverRegistered handler
-        // - Dispatch SchedulerActive to subscribers
-        //
-        // With deferring, we get this instead:
-        //
-        // - Enter DriverRegistered handler
-        // - Exit DriverRegistered handler
-        // (executor service dispatch delay)
-        // - Post SchedulerActive Event
-        //   - Dispatch SchedulerActive to subscribers
-        // - Announce leadership
-        //
-        // The latter is preferable since it makes it easier to reason about the state of
an
-        // announced scheduler.
-        delayedActions.onRegistered(new Runnable() {
-          @Override
-          public void run() {
-            eventSink.post(new SchedulerActive());
-            try {
-              leaderControl.get().advertise();
-            } catch (JoinException e) {
-              LOG.log(Level.SEVERE, "Failed to advertise leader, shutting down.", e);
-              stateMachine.transition(State.DEAD);
-            } catch (InterruptedException e) {
-              LOG.log(Level.SEVERE, "Interrupted while advertising leader, shutting down.",
e);
-              stateMachine.transition(State.DEAD);
-              Thread.currentThread().interrupt();
-            }
-          }
-        });
+        // TODO(ksweeney): Extract leader advertisement to its own service.
+        schedulerActiveServiceManager.startAsync().awaitHealthy();
+        try {
+          leaderControl.get().advertise();
+        } catch (JoinException | InterruptedException e) {
+          LOG.log(Level.SEVERE, "Failed to advertise leader, shutting down.");
+          throw Throwables.propagate(e);
+        }
       }
     };
 
@@ -522,7 +497,14 @@ public class SchedulerLifecycle implements EventSubscriber {
     void onAutoFailover(Runnable runnable);
 
     void onRegistrationTimeout(Runnable runnable);
-
-    void onRegistered(Runnable runnable);
   }
+
+  /**
+   * Qualifier for services that will be run after the scheduler storage is available
+   * but before leadership is announced in ZooKeeper.
+   */
+  @Qualifier
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER})
+  static @interface SchedulerActive { }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index 09c8bc9..8d30640 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler;
 
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Logger;
@@ -21,17 +22,25 @@ import java.util.logging.Logger;
 import javax.inject.Singleton;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.ServiceManager;
 import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
 import com.google.inject.PrivateModule;
 import com.google.inject.Provides;
+import com.google.inject.binder.LinkedBindingBuilder;
+import com.google.inject.multibindings.Multibinder;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 
+import org.apache.aurora.GuavaUtils;
+import org.apache.aurora.GuavaUtils.ServiceManagerIface;
 import org.apache.aurora.scheduler.Driver.DriverImpl;
 import org.apache.aurora.scheduler.Driver.SettableDriver;
 import org.apache.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
+import org.apache.aurora.scheduler.SchedulerLifecycle.SchedulerActive;
 import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
 import org.apache.aurora.scheduler.async.GcExecutorLauncher;
 import org.apache.aurora.scheduler.base.AsyncUtil;
@@ -88,7 +97,9 @@ public class SchedulerModule extends AbstractModule {
     });
 
     PubsubEventModule.bindSubscriber(binder(), SchedulerLifecycle.class);
+    bind(TaskVars.class).in(Singleton.class);
     PubsubEventModule.bindSubscriber(binder(), TaskVars.class);
+    addSchedulerActiveServiceBinding(binder()).to(TaskVars.class);
   }
 
   @Provides
@@ -99,4 +110,24 @@ public class SchedulerModule extends AbstractModule {
 
     return ImmutableList.of(gcLauncher, userTaskLauncher);
   }
+
+  /**
+   * Register a Service to run after storage is ready, but before the scheduler has announced
+   * leadership. If this service fails to startup the scheduler will abort.
+   *
+   * Usage: {@code addSchedulerActiveServiceBinding(binder()).to(YourService.class)}.
+   *
+   * @param binder Binder for the current non-private module.
+   * @return a linked binding builder with the normal Guice EDSL methods.
+   */
+  public static LinkedBindingBuilder<Service> addSchedulerActiveServiceBinding(Binder
binder) {
+    return Multibinder.newSetBinder(binder, Service.class, SchedulerActive.class).addBinding();
+  }
+
+  @Provides
+  @Singleton
+  @SchedulerActive
+  ServiceManagerIface provideSchedulerActiveServiceManager(@SchedulerActive Set<Service>
services) {
+    return GuavaUtils.serviceManager(new ServiceManager(services));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/main/java/org/apache/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
index 3ebb8d0..cf8f758 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -32,12 +32,12 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
 import com.twitter.common.stats.StatsProvider;
 
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.storage.AttributeStore;
@@ -52,7 +52,7 @@ import static java.util.Objects.requireNonNull;
 /**
  * A container that tracks and exports stat counters for tasks.
  */
-class TaskVars implements EventSubscriber {
+class TaskVars extends AbstractIdleService implements EventSubscriber {
   private static final Logger LOG = Logger.getLogger(TaskVars.class.getName());
   private static final ImmutableSet<ScheduleStatus> TRACKED_JOB_STATES =
       ImmutableSet.of(ScheduleStatus.LOST, ScheduleStatus.FAILED);
@@ -180,8 +180,8 @@ class TaskVars implements EventSubscriber {
     updateJobCounters(task, task.getStatus());
   }
 
-  @Subscribe
-  public void schedulerActive(SchedulerActive event) {
+  @Override
+  protected void startUp() {
     // Dummy read the counter for each status counter. This is important to guarantee a stat
with
     // value zero is present for each state, even if all states are not represented in the
task
     // store.
@@ -193,6 +193,11 @@ class TaskVars implements EventSubscriber {
     exportCounters(untrackedCounters.asMap());
   }
 
+  @Override
+  protected void shutDown() {
+    // Ignored. VM shutdown is required to stop exporting task vars.
+  }
+
   private void exportCounters(Map<String, Counter> counterMap) {
     // Initiate export of all counters.  This is not done initially to avoid exporting values
that
     // do not represent the entire storage contents.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronLifecycle.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronLifecycle.java
index 1d39783..64d9486 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronLifecycle.java
@@ -20,16 +20,12 @@ import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.AbstractIdleService;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.Command;
 import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.cron.CronException;
 import org.apache.aurora.scheduler.cron.SanitizedCronJob;
-import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
@@ -39,7 +35,7 @@ import static java.util.Objects.requireNonNull;
 /**
  * Manager for startup and teardown of Quartz scheduler.
  */
-class CronLifecycle extends AbstractIdleService implements PubsubEvent.EventSubscriber {
+class CronLifecycle extends AbstractIdleService {
   private static final Logger LOG = Logger.getLogger(CronLifecycle.class.getName());
 
   private static final AtomicInteger RUNNING_FLAG = Stats.exportInt("quartz_scheduler_running");
@@ -47,38 +43,14 @@ class CronLifecycle extends AbstractIdleService implements PubsubEvent.EventSubs
   private static final AtomicLong LAUNCH_FAILURES = Stats.exportLong("cron_job_launch_failures");
 
   private final Scheduler scheduler;
-  private final ShutdownRegistry shutdownRegistry;
   private final CronJobManagerImpl cronJobManager;
 
   @Inject
-  CronLifecycle(
-      Scheduler scheduler,
-      ShutdownRegistry shutdownRegistry,
-      CronJobManagerImpl cronJobManager) {
-
+  CronLifecycle(Scheduler scheduler, CronJobManagerImpl cronJobManager) {
     this.scheduler = requireNonNull(scheduler);
-    this.shutdownRegistry = requireNonNull(shutdownRegistry);
     this.cronJobManager = requireNonNull(cronJobManager);
   }
 
-  /**
-   * Notifies the cronScheduler job manager that the scheduler is active, and job configurations
-   * are ready to load.
-   *
-   * @param schedulerActive Event.
-   */
-  @Subscribe
-  public void schedulerActive(PubsubEvent.SchedulerActive schedulerActive) {
-    startAsync();
-    shutdownRegistry.addAction(new Command() {
-      @Override
-      public void execute() {
-        CronLifecycle.this.stopAsync().awaitTerminated();
-      }
-    });
-    awaitRunning();
-  }
-
   @Override
   protected void startUp() throws SchedulerException {
     LOG.info("Starting Quartz cron scheduler" + scheduler.getSchedulerName() + ".");

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
index 10465ce..22c666e 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
@@ -28,10 +28,10 @@ import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.util.BackoffHelper;
 
+import org.apache.aurora.scheduler.SchedulerModule;
 import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CronPredictor;
 import org.apache.aurora.scheduler.cron.CronScheduler;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
 import org.quartz.impl.StdSchedulerFactory;
@@ -90,7 +90,7 @@ public class CronModule extends AbstractModule {
         new BackoffHelper(CRON_START_INITIAL_BACKOFF.get(), CRON_START_MAX_BACKOFF.get())));
 
     bind(CronLifecycle.class).in(Singleton.class);
-    PubsubEventModule.bindSubscriber(binder(), CronLifecycle.class);
+    SchedulerModule.addSchedulerActiveServiceBinding(binder()).to(CronLifecycle.class);
   }
 
   @Provides

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
index e7fafec..9c095d9 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
@@ -246,15 +246,4 @@ public interface PubsubEvent {
     }
   }
 
-  class SchedulerActive implements PubsubEvent {
-    @Override
-    public boolean equals(Object o) {
-      return o != null && getClass().equals(o.getClass());
-    }
-
-    @Override
-    public int hashCode() {
-      return getClass().hashCode();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
index b5c8533..354844a 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
@@ -23,21 +23,17 @@ import javax.inject.Inject;
 import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.AbstractModule;
 import com.google.inject.Singleton;
-import com.twitter.common.application.modules.LifecycleModule;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
 import com.twitter.common.args.constraints.Positive;
-import com.twitter.common.base.Command;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 
+import org.apache.aurora.scheduler.SchedulerModule;
 import org.apache.aurora.scheduler.base.AsyncUtil;
-import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.sla.MetricCalculator.MetricCalculatorSettings;
 
 import static java.lang.annotation.ElementType.FIELD;
@@ -84,11 +80,12 @@ public class SlaModule extends AbstractModule {
         .annotatedWith(SlaExecutor.class)
         .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SlaStat-%d", LOG));
 
-    PubsubEventModule.bindSubscriber(binder(), SlaUpdater.class);
-    LifecycleModule.bindStartupAction(binder(), SlaUpdater.class);
+    bind(SlaUpdater.class).in(Singleton.class);
+    SchedulerModule.addSchedulerActiveServiceBinding(binder()).to(SlaUpdater.class);
   }
 
-  static class SlaUpdater implements Command, EventSubscriber {
+  // TODO(ksweeney): This should use AbstractScheduledService.
+  static class SlaUpdater extends AbstractIdleService {
     private final ScheduledExecutorService executor;
     private final MetricCalculator calculator;
     private final MetricCalculatorSettings settings;
@@ -104,16 +101,16 @@ public class SlaModule extends AbstractModule {
       this.settings = requireNonNull(settings);
     }
 
-    @Subscribe
-    public void schedulerActive(SchedulerActive event) {
+    @Override
+    protected void startUp() {
       long interval = settings.getRefreshRateMs();
       executor.scheduleAtFixedRate(calculator, interval, interval, TimeUnit.MILLISECONDS);
       LOG.info(String.format("Scheduled SLA calculation with %d msec interval.", interval));
     }
 
     @Override
-    public void execute() throws RuntimeException {
-      // Execution scheduled on SchedulerActive event.
+    protected void shutDown() {
+      // Ignored. VM shutdown is required to stop computing SLAs.
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java
index 463a3aa..134cd54 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java
@@ -18,6 +18,7 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.Inject;
 import com.twitter.common.stats.Stats;
 
@@ -34,7 +35,7 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 /**
  * A pubsub event subscriber that forwards status updates to the job update controller.
  */
-class JobUpdateEventSubscriber implements PubsubEvent.EventSubscriber {
+class JobUpdateEventSubscriber extends AbstractIdleService implements PubsubEvent.EventSubscriber
{
   private static final Logger LOG = Logger.getLogger(JobUpdateEventSubscriber.class.getName());
 
   private static final AtomicLong RECOVERY_ERRORS = Stats.exportLong("job_update_recovery_errors");
@@ -77,8 +78,9 @@ class JobUpdateEventSubscriber implements PubsubEvent.EventSubscriber {
     }
   }
 
-  @Subscribe
-  public void schedulerActive(PubsubEvent.SchedulerActive event) {
+  // TODO(ksweeney): Investigate letting this exception propagate up.
+  @Override
+  protected void startUp() {
     try {
       controller.systemResume();
     } catch (RuntimeException e) {
@@ -86,4 +88,9 @@ class JobUpdateEventSubscriber implements PubsubEvent.EventSubscriber {
       RECOVERY_ERRORS.incrementAndGet();
     }
   }
+
+  @Override
+  protected void shutDown() {
+    // Ignored. VM shutdown is required to stop processing updates.
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index d4d3a25..9861601 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -14,21 +14,23 @@
 package org.apache.aurora.scheduler;
 
 import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Optional;
 import com.twitter.common.application.Lifecycle;
 import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.base.Command;
+import com.twitter.common.base.ExceptionalCommand;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.Clock;
 import com.twitter.common.zookeeper.SingletonService.LeaderControl;
 import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
 
+import org.apache.aurora.GuavaUtils.ServiceManagerIface;
 import org.apache.aurora.scheduler.Driver.SettableDriver;
 import org.apache.aurora.scheduler.SchedulerLifecycle.DelayedActions;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
@@ -61,6 +63,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
   private DelayedActions delayedActions;
   private EventSink eventSink;
   private FakeStatsProvider statsProvider;
+  private ServiceManagerIface serviceManager;
 
   private SchedulerLifecycle schedulerLifecycle;
 
@@ -75,16 +78,17 @@ public class SchedulerLifecycleTest extends EasyMockTest {
     delayedActions = createMock(DelayedActions.class);
     eventSink = createMock(EventSink.class);
     statsProvider = new FakeStatsProvider();
+    serviceManager = createMock(ServiceManagerIface.class);
   }
 
   /**
-   * Composite interface to mimick a ShutdownRegistry implementation that can be triggered.
+   * Composite interface to mimic a ShutdownRegistry implementation that can be triggered.
    */
   private interface ShutdownSystem extends ShutdownRegistry, Command {
   }
 
-  private Capture<Command> replayAndCreateLifecycle() {
-    Capture<Command> shutdownCommand = createCapture();
+  private Capture<ExceptionalCommand<?>> replayAndCreateLifecycle() {
+    Capture<ExceptionalCommand<?>> shutdownCommand = createCapture();
     shutdownRegistry.addAction(capture(shutdownCommand));
 
     Clock clock = createMock(Clock.class);
@@ -105,7 +109,8 @@ public class SchedulerLifecycleTest extends EasyMockTest {
         clock,
         eventSink,
         shutdownRegistry,
-        statsProvider);
+        statsProvider,
+        serviceManager);
     assertEquals(0, statsProvider.getValue(SchedulerLifecycle.REGISTERED_GAUGE));
     assertEquals(1, statsProvider.getValue(stateGaugeName(State.IDLE)));
     return shutdownCommand;
@@ -123,12 +128,11 @@ public class SchedulerLifecycleTest extends EasyMockTest {
     delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
   }
 
-  private Capture<Runnable> expectFullStartup() throws Exception {
-    Capture<Runnable> handleRegistered = createCapture();
-    delayedActions.onRegistered(capture(handleRegistered));
+  private void expectFullStartup() throws Exception {
     leaderControl.advertise();
-    eventSink.post(new SchedulerActive());
-    return handleRegistered;
+
+    expect(serviceManager.startAsync()).andReturn(serviceManager);
+    serviceManager.awaitHealthy();
   }
 
   private void expectShutdown() throws Exception {
@@ -152,7 +156,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
     delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
     expectInitializeDriver();
 
-    Capture<Runnable> handleRegistered = expectFullStartup();
+    expectFullStartup();
     expectShutdown();
 
     replayAndCreateLifecycle();
@@ -165,7 +169,6 @@ public class SchedulerLifecycleTest extends EasyMockTest {
     schedulerLifecycle.registered(new DriverRegistered());
     assertEquals(1, statsProvider.getValue(stateGaugeName(State.ACTIVE)));
     assertEquals(1, statsProvider.getValue(SchedulerLifecycle.REGISTERED_GAUGE));
-    handleRegistered.getValue().run();
     triggerFailover.getValue().run();
   }
 
@@ -237,15 +240,16 @@ public class SchedulerLifecycleTest extends EasyMockTest {
     delayedActions.onRegistrationTimeout(EasyMock.<Runnable>anyObject());
     expectInitializeDriver();
 
-    Capture<Runnable> handleRegistered = expectFullStartup();
+    expectFullStartup();
     expectShutdown();
+    expect(serviceManager.stopAsync()).andReturn(serviceManager);
+    serviceManager.awaitStopped(5, TimeUnit.SECONDS);
 
-    Capture<Command> shutdownCommand = replayAndCreateLifecycle();
+    Capture<ExceptionalCommand<?>> shutdownCommand = replayAndCreateLifecycle();
 
     LeadershipListener leaderListener = schedulerLifecycle.prepare();
     leaderListener.onLeading(leaderControl);
     schedulerLifecycle.registered(new DriverRegistered());
-    handleRegistered.getValue().run();
     shutdownCommand.getValue().execute();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
index e091ca3..12ea4c6 100644
--- a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
@@ -32,7 +32,6 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -119,7 +118,7 @@ public class TaskVarsTest extends EasyMockTest {
     for (IScheduledTask task : initialTasks) {
       vars.taskChangedState(TaskStateChange.initialized(task));
     }
-    vars.schedulerActive(new SchedulerActive());
+    vars.startAsync().awaitRunning();
   }
 
   private IScheduledTask makeTask(String job, ScheduleStatus status, String host) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
index f3c7c5b..915d7c8 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
@@ -20,8 +20,6 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.util.Modules;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.ExceptionalCommand;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.Clock;
 
@@ -33,15 +31,11 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CrontabEntry;
 import org.apache.aurora.scheduler.cron.SanitizedCronJob;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.state.PubsubTestUtil;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.easymock.Capture;
 import org.easymock.IAnswer;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,7 +44,6 @@ import org.quartz.Scheduler;
 import org.quartz.Trigger;
 import org.quartz.TriggerListener;
 
-import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.isA;
 import static org.junit.Assert.assertEquals;
@@ -82,18 +75,13 @@ public class CronIT extends EasyMockTest {
               .setDiskMb(9))
   );
 
-  private ShutdownRegistry shutdownRegistry;
-  private EventSink eventSink;
   private Injector injector;
   private StateManager stateManager;
   private Storage storage;
   private AuroraCronJob auroraCronJob;
 
-  private Capture<ExceptionalCommand<?>> shutdown;
-
   @Before
   public void setUp() throws Exception {
-    shutdownRegistry = createMock(ShutdownRegistry.class);
     stateManager = createMock(StateManager.class);
     storage = MemStorage.newEmptyStorage();
     auroraCronJob = createMock(AuroraCronJob.class);
@@ -110,21 +98,16 @@ public class CronIT extends EasyMockTest {
           @Override
           protected void configure() {
             bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
-            bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
             bind(StateManager.class).toInstance(stateManager);
             bind(Storage.class).toInstance(storage);
-
-            PubsubTestUtil.installPubsub(binder());
           }
         });
-    eventSink = PubsubTestUtil.startPubsub(injector);
-
-    shutdown = createCapture();
-    shutdownRegistry.addAction(capture(shutdown));
   }
 
-  private void boot() {
-    eventSink.post(new PubsubEvent.SchedulerActive());
+  private Service boot() {
+    Service service = injector.getInstance(CronLifecycle.class);
+    service.startAsync().awaitRunning();
+    return service;
   }
 
   @Test
@@ -134,13 +117,12 @@ public class CronIT extends EasyMockTest {
     Scheduler scheduler = injector.getInstance(Scheduler.class);
     assertTrue(!scheduler.isStarted());
 
-    boot();
-    Service cronLifecycle = injector.getInstance(CronLifecycle.class);
+    Service cronLifecycle = boot();
 
     assertTrue(cronLifecycle.isRunning());
     assertTrue(scheduler.isStarted());
 
-    shutdown.getValue().execute();
+    cronLifecycle.stopAsync().awaitTerminated();
 
     assertTrue(!cronLifecycle.isRunning());
     assertTrue(scheduler.isShutdown());
@@ -165,11 +147,11 @@ public class CronIT extends EasyMockTest {
 
     final CountDownLatch cronRan = new CountDownLatch(1);
     scheduler.getListenerManager().addTriggerListener(new CountDownWhenComplete(cronRan));
-    boot();
+    Service service = boot();
 
     cronRan.await();
 
-    shutdown.getValue().execute();
+    service.stopAsync().awaitTerminated();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java b/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
index 850c9a9..5ee123a 100644
--- a/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
@@ -36,9 +36,7 @@ import com.twitter.common.util.Clock;
 import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
-import org.apache.aurora.scheduler.state.PubsubTestUtil;
+import org.apache.aurora.scheduler.sla.SlaModule.SlaUpdater;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.easymock.EasyMock;
@@ -58,7 +56,6 @@ public class SlaModuleTest extends EasyMockTest {
   private StorageTestUtil storageUtil;
   private StatsProvider statsProvider;
   private SlaModule module;
-  private EventSink eventSink;
 
   @Before
   public void setUp() throws Exception {
@@ -74,14 +71,12 @@ public class SlaModuleTest extends EasyMockTest {
             .add(new AbstractModule() {
               @Override
               protected void configure() {
-                PubsubTestUtil.installPubsub(binder());
                 bind(Clock.class).toInstance(clock);
                 bind(Storage.class).toInstance(storageUtil.storage);
                 bind(StatsProvider.class).toInstance(statsProvider);
               }
             }).build()
     );
-    eventSink = PubsubTestUtil.startPubsub(injector);
   }
 
   @Test
@@ -121,7 +116,7 @@ public class SlaModuleTest extends EasyMockTest {
 
     control.replay();
 
-    eventSink.post(new SchedulerActive());
+    injector.getInstance(SlaUpdater.class).startAsync().awaitRunning();
     latch.await();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
index c53cfe0..da3986d 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.updater;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.Service;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.gen.AssignedTask;
@@ -30,7 +31,6 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import static org.easymock.EasyMock.expectLastCall;
@@ -56,6 +56,7 @@ public class JobUpdateEventSubscriberTest extends EasyMockTest {
           .setInstanceId(TASK.getAssignedTask().getInstanceId()));
 
   private JobUpdateController updater;
+  private Service service;
 
   private EventBus eventBus;
 
@@ -63,8 +64,10 @@ public class JobUpdateEventSubscriberTest extends EasyMockTest {
   public void setUp() {
     updater = createMock(JobUpdateController.class);
 
+    service = new JobUpdateEventSubscriber(updater);
+
     eventBus = new EventBus();
-    eventBus.register(new JobUpdateEventSubscriber(updater));
+    eventBus.register(service);
   }
 
   @Test
@@ -91,7 +94,7 @@ public class JobUpdateEventSubscriberTest extends EasyMockTest {
 
     control.replay();
 
-    eventBus.post(new SchedulerActive());
+    service.startAsync().awaitRunning();
   }
 
   @Test
@@ -105,7 +108,7 @@ public class JobUpdateEventSubscriberTest extends EasyMockTest {
 
     control.replay();
 
-    eventBus.post(new SchedulerActive());
+    service.startAsync().awaitRunning();
     eventBus.post(TaskStateChange.initialized(TASK));
     eventBus.post(new TasksDeleted(ImmutableSet.of(TASK)));
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2d68bc69/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 8baec04..88319ea 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -120,7 +120,6 @@ import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
 import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
-import static org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import static org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import static org.apache.aurora.scheduler.updater.UpdateFactory.UpdateFactoryImpl.expandInstanceIds;
 import static org.easymock.EasyMock.expectLastCall;
@@ -766,7 +765,7 @@ public class JobUpdaterIT extends EasyMockTest {
       }
     });
 
-    eventBus.post(new SchedulerActive());
+    subscriber.startAsync().awaitRunning();
 
     // Instance 0 is updated.
     changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
@@ -798,7 +797,7 @@ public class JobUpdaterIT extends EasyMockTest {
       }
     });
 
-    eventBus.post(new SchedulerActive());
+    subscriber.startAsync().awaitRunning();
     assertState(ERROR, ImmutableMultimap.<Integer, JobUpdateAction>of());
   }
 


Mime
View raw message