aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject incubator-aurora git commit: Send an event for host attributes changing rather than maintenance mode changing.
Date Wed, 12 Nov 2014 00:10:38 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 0774c4e67 -> 7ecb718aa


Send an event for host attributes changing rather than maintenance mode changing.

Bugs closed: AURORA-913

Reviewed at https://reviews.apache.org/r/27889/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/7ecb718a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/7ecb718a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/7ecb718a

Branch: refs/heads/master
Commit: 7ecb718aa961463c8daa26f6e844c23a494c8680
Parents: 0774c4e
Author: Bill Farner <wfarner@apache.org>
Authored: Tue Nov 11 16:08:04 2014 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Tue Nov 11 16:08:04 2014 -0800

----------------------------------------------------------------------
 .../apache/aurora/scheduler/app/AppModule.java  |  3 +-
 .../aurora/scheduler/async/OfferQueue.java      | 10 ++--
 .../aurora/scheduler/events/PubsubEvent.java    | 33 +++++++----
 .../scheduler/events/PubsubEventModule.java     | 40 ++++++++-----
 .../scheduler/state/MaintenanceController.java  | 11 +---
 .../scheduler/storage/log/LogStorage.java       | 62 +++++++++++---------
 .../storage/log/WriteAheadStorage.java          |  8 ++-
 .../aurora/scheduler/async/KillRetryTest.java   |  4 +-
 .../scheduler/async/TaskSchedulerImplTest.java  | 30 +++++-----
 .../scheduler/async/TaskSchedulerTest.java      |  8 ++-
 .../scheduler/events/PubsubEventModuleTest.java | 55 +++++++++++++++++
 .../state/MaintenanceControllerImplTest.java    |  7 +--
 .../aurora/scheduler/state/PubsubTestUtil.java  | 11 ----
 .../scheduler/storage/log/LogStorageTest.java   | 37 +++++++-----
 .../storage/log/WriteAheadStorageTest.java      | 37 +++++++++++-
 15 files changed, 232 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index fef76f5..19bf162 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -106,7 +106,8 @@ class AppModule extends AbstractModule {
                 .setStatsUrlPrefix(statsUrlPrefix)));
 
     // Filter layering: notifier filter -> base impl
-    PubsubEventModule.bind(binder(), SchedulingFilterImpl.class);
+    install(new PubsubEventModule(true));
+    PubsubEventModule.bindSchedulingFilterDelegate(binder()).to(SchedulingFilterImpl.class);
     bind(SchedulingFilterImpl.class).in(Singleton.class);
 
     LifecycleModule.bindStartupAction(binder(), RegisterShutdownStackPrinter.class);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
index 14c3e3a..dd8a900 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
@@ -39,7 +39,6 @@ import com.twitter.common.stats.Stats;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.mesos.Protos.Offer;
@@ -53,6 +52,7 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
 import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
+import static org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
 
 /**
  * Tracks the Offers currently known by the scheduler.
@@ -86,11 +86,11 @@ public interface OfferQueue extends EventSubscriber {
   boolean launchFirst(Function<HostOffer, Optional<TaskInfo>> acceptor) throws
LaunchException;
 
   /**
-   * Notifies the offer queue that a host has changed state.
+   * Notifies the offer queue that a host's attributes have changed.
    *
    * @param change State change notification.
    */
-  void hostChangedState(HostMaintenanceStateChange change);
+  void hostAttributesChanged(HostAttributesChanged change);
 
   /**
    * Gets the offers that the scheduler is holding.
@@ -251,8 +251,8 @@ public interface OfferQueue extends EventSubscriber {
      * @param change Host change notification.
      */
     @Subscribe
-    public void hostChangedState(HostMaintenanceStateChange change) {
-      hostOffers.updateHostMode(change.getStatus().getHost(), change.getStatus().getMode());
+    public void hostAttributesChanged(HostAttributesChanged change) {
+      hostOffers.updateHostMode(change.getAttributes().getHost(), change.getAttributes().getMode());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
index 9c095d9..4821a78 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
@@ -18,10 +18,10 @@ import java.util.Set;
 
 import com.google.common.base.Optional;
 
-import org.apache.aurora.gen.HostStatus;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
 import static java.util.Objects.requireNonNull;
@@ -156,32 +156,39 @@ public interface PubsubEvent {
   }
 
   /**
-   * Event sent when a host changed maintenance state.
+   * Event sent when a host's attributes change.
    */
-  class HostMaintenanceStateChange implements PubsubEvent {
-    private final HostStatus status;
+  class HostAttributesChanged implements PubsubEvent {
+    private final IHostAttributes attributes;
 
-    public HostMaintenanceStateChange(HostStatus status) {
-      this.status = requireNonNull(status);
+    public HostAttributesChanged(IHostAttributes attributes) {
+      this.attributes = requireNonNull(attributes);
     }
 
-    public HostStatus getStatus() {
-      return status;
+    public IHostAttributes getAttributes() {
+      return attributes;
+    }
+
+    @Override
+    public int hashCode() {
+      return attributes.hashCode();
     }
 
     @Override
     public boolean equals(Object o) {
-      if (!(o instanceof HostMaintenanceStateChange)) {
+      if (!(o instanceof HostAttributesChanged)) {
         return false;
       }
 
-      HostMaintenanceStateChange other = (HostMaintenanceStateChange) o;
-      return Objects.equals(status, other.status);
+      HostAttributesChanged other = (HostAttributesChanged) o;
+      return Objects.equals(attributes, other.getAttributes());
     }
 
     @Override
-    public int hashCode() {
-      return Objects.hash(status);
+    public String toString() {
+      return com.google.common.base.Objects.toStringHelper(this)
+          .add("attributes", getAttributes())
+          .toString();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
index 0ba31ac..24cd750 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
+import com.google.inject.binder.LinkedBindingBuilder;
 import com.google.inject.multibindings.Multibinder;
 import com.twitter.common.application.modules.LifecycleModule;
 import com.twitter.common.base.Command;
@@ -45,20 +46,22 @@ import static java.util.Objects.requireNonNull;
  */
 public final class PubsubEventModule extends AbstractModule {
 
-  private static final Logger LOG = Logger.getLogger(PubsubEventModule.class.getName());
-
   private final boolean async;
+  private final Logger log;
 
-  private PubsubEventModule(boolean async) {
-    // Must be constructed through factory.
-    this.async = async;
+  @VisibleForTesting
+  PubsubEventModule(boolean async, Logger log) {
+    this.log = requireNonNull(log);
+    this.async = requireNonNull(async);
   }
 
-  @VisibleForTesting
-  public static void installForTest(Binder binder) {
-    binder.install(new PubsubEventModule(false));
+  public PubsubEventModule(boolean async) {
+    this(async, Logger.getLogger(PubsubEventModule.class.getName()));
   }
 
+  @VisibleForTesting
+  static final String DEAD_EVENT_MESSAGE = "Captured dead event %s";
+
   @Override
   protected void configure() {
     final Executor executor;
@@ -75,7 +78,7 @@ public final class PubsubEventModule extends AbstractModule {
     final EventBus eventBus = new AsyncEventBus("AsyncTaskEvents", executor);
     eventBus.register(new Object() {
       @Subscribe public void logDeadEvent(DeadEvent event) {
-        LOG.warning("Captured dead event " + event.getEvent());
+        log.warning(String.format(DEAD_EVENT_MESSAGE, event.getEvent()));
       }
     });
     bind(EventBus.class).toInstance(eventBus);
@@ -112,15 +115,24 @@ public final class PubsubEventModule extends AbstractModule {
   }
 
   /**
-   * Binds a task event module.
+   * Gets a binding builder that must be used to wire up the scheduling filter implementation
+   * that backs the delegating scheduling filter that fires pubsub events.
    *
-   * @param binder Binder to bind against.
-   * @param filterClass Delegate scheduling filter implementation class.
+   * @param binder Binder to create a binding against.
+   * @return A linked binding builder that may be used to wire up the scheduling filter.
    */
-  public static void bind(Binder binder, final Class<? extends SchedulingFilter> filterClass)
{
-    binder.bind(SchedulingFilter.class).annotatedWith(NotifyDelegate.class).to(filterClass);
+  public static LinkedBindingBuilder<SchedulingFilter> bindSchedulingFilterDelegate(Binder
binder) {
     binder.bind(SchedulingFilter.class).to(NotifyingSchedulingFilter.class);
     binder.bind(NotifyingSchedulingFilter.class).in(Singleton.class);
+    return binder.bind(SchedulingFilter.class).annotatedWith(NotifyDelegate.class);
+  }
+
+  /**
+   * Binds a task event module.
+   *
+   * @param binder Binder to bind against.
+   */
+  public static void bind(Binder binder) {
     binder.install(new PubsubEventModule(true));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 88418e6..077699f 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -32,8 +32,6 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.storage.AttributeStore;
@@ -105,17 +103,11 @@ public interface MaintenanceController {
     private static final Logger LOG = Logger.getLogger(MaintenanceControllerImpl.class.getName());
     private final Storage storage;
     private final StateManager stateManager;
-    private final EventSink eventSink;
 
     @Inject
-    public MaintenanceControllerImpl(
-        Storage storage,
-        StateManager stateManager,
-        EventSink eventSink) {
-
+    public MaintenanceControllerImpl(Storage storage, StateManager stateManager) {
       this.storage = requireNonNull(storage);
       this.stateManager = requireNonNull(stateManager);
-      this.eventSink = requireNonNull(eventSink);
     }
 
     private Set<HostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String>
hosts) {
@@ -272,7 +264,6 @@ public interface MaintenanceController {
         if (toSave.isPresent()) {
           store.saveHostAttributes(toSave.get());
           HostStatus status = new HostStatus().setHost(host).setMode(mode);
-          eventSink.post(new PubsubEvent.HostMaintenanceStateChange(status.deepCopy()));
           statuses.add(status);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 32890f5..45ea50f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -31,7 +31,6 @@ import javax.inject.Qualifier;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
-
 import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.base.Closure;
 import com.twitter.common.inject.TimedInterceptor.Timed;
@@ -49,6 +48,7 @@ import org.apache.aurora.gen.storage.SaveQuota;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
 import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
 import org.apache.aurora.scheduler.storage.AttributeStore;
@@ -226,19 +226,21 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
   private final Map<Op._Fields, Closure<Op>> transactionReplayActions;
 
   @Inject
-  LogStorage(LogManager logManager,
-             ShutdownRegistry shutdownRegistry,
-             @ShutdownGracePeriod Amount<Long, Time> shutdownGracePeriod,
-             SnapshotStore<Snapshot> snapshotStore,
-             @SnapshotInterval Amount<Long, Time> snapshotInterval,
-             @WriteBehind Storage storage,
-             @WriteBehind SchedulerStore.Mutable schedulerStore,
-             @WriteBehind JobStore.Mutable jobStore,
-             @WriteBehind TaskStore.Mutable taskStore,
-             @WriteBehind LockStore.Mutable lockStore,
-             @WriteBehind QuotaStore.Mutable quotaStore,
-             @WriteBehind AttributeStore.Mutable attributeStore,
-             @WriteBehind JobUpdateStore.Mutable jobUpdateStore) {
+  LogStorage(
+      LogManager logManager,
+      ShutdownRegistry shutdownRegistry,
+      @ShutdownGracePeriod Amount<Long, Time> shutdownGracePeriod,
+      SnapshotStore<Snapshot> snapshotStore,
+      @SnapshotInterval Amount<Long, Time> snapshotInterval,
+      @WriteBehind Storage storage,
+      @WriteBehind SchedulerStore.Mutable schedulerStore,
+      @WriteBehind JobStore.Mutable jobStore,
+      @WriteBehind TaskStore.Mutable taskStore,
+      @WriteBehind LockStore.Mutable lockStore,
+      @WriteBehind QuotaStore.Mutable quotaStore,
+      @WriteBehind AttributeStore.Mutable attributeStore,
+      @WriteBehind JobUpdateStore.Mutable jobUpdateStore,
+      EventSink eventSink) {
 
     this(logManager,
         new ScheduledExecutorSchedulingService(shutdownRegistry, shutdownGracePeriod),
@@ -251,22 +253,25 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
         lockStore,
         quotaStore,
         attributeStore,
-        jobUpdateStore);
+        jobUpdateStore,
+        eventSink);
   }
 
   @VisibleForTesting
-  LogStorage(LogManager logManager,
-             SchedulingService schedulingService,
-             SnapshotStore<Snapshot> snapshotStore,
-             Amount<Long, Time> snapshotInterval,
-             Storage delegateStorage,
-             SchedulerStore.Mutable schedulerStore,
-             JobStore.Mutable jobStore,
-             TaskStore.Mutable taskStore,
-             LockStore.Mutable lockStore,
-             QuotaStore.Mutable quotaStore,
-             AttributeStore.Mutable attributeStore,
-             JobUpdateStore.Mutable jobUpdateStore) {
+  LogStorage(
+      LogManager logManager,
+      SchedulingService schedulingService,
+      SnapshotStore<Snapshot> snapshotStore,
+      Amount<Long, Time> snapshotInterval,
+      Storage delegateStorage,
+      SchedulerStore.Mutable schedulerStore,
+      JobStore.Mutable jobStore,
+      TaskStore.Mutable taskStore,
+      LockStore.Mutable lockStore,
+      QuotaStore.Mutable quotaStore,
+      AttributeStore.Mutable attributeStore,
+      JobUpdateStore.Mutable jobUpdateStore,
+      EventSink eventSink) {
 
     this.logManager = requireNonNull(logManager);
     this.schedulingService = requireNonNull(schedulingService);
@@ -305,7 +310,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
         quotaStore,
         attributeStore,
         jobUpdateStore,
-        Logger.getLogger(WriteAheadStorage.class.getName()));
+        Logger.getLogger(WriteAheadStorage.class.getName()),
+        eventSink);
 
     this.logEntryReplayActions = buildLogEntryReplayActions();
     this.transactionReplayActions = buildTransactionReplayActions();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index c6250b4..2d27599 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -43,6 +43,8 @@ import org.apache.aurora.gen.storage.SaveQuota;
 import org.apache.aurora.gen.storage.SaveTasks;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.ForwardingStore;
 import org.apache.aurora.scheduler.storage.JobStore;
@@ -92,6 +94,7 @@ class WriteAheadStorage extends ForwardingStore implements
   private final AttributeStore.Mutable attributeStore;
   private final JobUpdateStore.Mutable jobUpdateStore;
   private final Logger log;
+  private final EventSink eventSink;
 
   /**
    * Creates a new write-ahead storage that delegates to the providing default stores.
@@ -114,7 +117,8 @@ class WriteAheadStorage extends ForwardingStore implements
       QuotaStore.Mutable quotaStore,
       AttributeStore.Mutable attributeStore,
       JobUpdateStore.Mutable jobUpdateStore,
-      Logger log) {
+      Logger log,
+      EventSink eventSink) {
 
     super(
         schedulerStore,
@@ -134,6 +138,7 @@ class WriteAheadStorage extends ForwardingStore implements
     this.attributeStore = requireNonNull(attributeStore);
     this.jobUpdateStore = requireNonNull(jobUpdateStore);
     this.log = requireNonNull(log);
+    this.eventSink = requireNonNull(eventSink);
   }
 
   private void write(Op op) {
@@ -216,6 +221,7 @@ class WriteAheadStorage extends ForwardingStore implements
     boolean changed = attributeStore.saveHostAttributes(attrs);
     if (changed) {
       write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder())));
+      eventSink.post(new PubsubEvent.HostAttributesChanged(attrs));
     }
     return changed;
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
index 662ebdc..e4e252e 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
@@ -80,20 +80,20 @@ public class KillRetryTest extends EasyMockTest {
     statsProvider = new FakeStatsProvider();
 
     Injector injector = Guice.createInjector(
+        new LifecycleModule(),
+        new PubsubEventModule(false),
         new AbstractModule() {
           @Override
           protected void configure() {
             bind(Driver.class).toInstance(driver);
             bind(Storage.class).toInstance(storageUtil.storage);
             bind(ScheduledExecutorService.class).toInstance(executorMock);
-            PubsubEventModule.installForTest(binder());
             PubsubEventModule.bindSubscriber(binder(), KillRetry.class);
             bind(KillRetry.class).in(Singleton.class);
             bind(BackoffStrategy.class).toInstance(backoffStrategy);
             bind(StatsProvider.class).toInstance(statsProvider);
             bind(UncaughtExceptionHandler.class)
                 .toInstance(createMock(UncaughtExceptionHandler.class));
-            install(new LifecycleModule());
           }
         }
     );

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 2f52510..0e699c9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -39,6 +39,7 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.state.PubsubTestUtil;
 import org.apache.aurora.scheduler.state.StateManager;
@@ -104,20 +105,21 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   private Injector getInjector(final Storage storageImpl) {
-    return Guice.createInjector(new AbstractModule() {
-      @Override
-      protected void configure() {
-        PubsubTestUtil.installPubsub(binder());
-        bind(AsyncModule.PREEMPTOR_KEY).toInstance(preemptor);
-        AsyncModule.bindTaskScheduler(binder(), AsyncModule.PREEMPTOR_KEY, reservationDuration);
-        bind(OfferQueue.class).toInstance(offerQueue);
-        bind(StateManager.class).toInstance(stateManager);
-        bind(TaskAssigner.class).toInstance(assigner);
-        bind(Clock.class).toInstance(clock);
-        bind(Storage.class).toInstance(storageImpl);
-        bind(StatsProvider.class).toInstance(Stats.STATS_PROVIDER);
-      }
-    });
+    return Guice.createInjector(
+        new PubsubEventModule(false),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(AsyncModule.PREEMPTOR_KEY).toInstance(preemptor);
+            AsyncModule.bindTaskScheduler(binder(), AsyncModule.PREEMPTOR_KEY, reservationDuration);
+            bind(OfferQueue.class).toInstance(offerQueue);
+            bind(StateManager.class).toInstance(stateManager);
+            bind(TaskAssigner.class).toInstance(assigner);
+            bind(Clock.class).toInstance(clock);
+            bind(Storage.class).toInstance(storageImpl);
+            bind(StatsProvider.class).toInstance(Stats.STATS_PROVIDER);
+          }
+        });
   }
 
   private void expectTaskStillPendingQuery(IScheduledTask task) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 7736d4c..7cf5d3e 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -33,7 +33,7 @@ import com.twitter.common.util.BackoffStrategy;
 import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.HostStatus;
+import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.MaintenanceMode;
@@ -45,7 +45,7 @@ import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
+import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
@@ -59,6 +59,7 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.mesos.Protos.Offer;
@@ -621,6 +622,7 @@ public class TaskSchedulerTest extends EasyMockTest {
   }
 
   private void changeHostMaintenanceState(String hostName, MaintenanceMode mode) {
-    offerQueue.hostChangedState(new HostMaintenanceStateChange(new HostStatus(hostName, mode)));
+    offerQueue.hostAttributesChanged(new PubsubEvent.HostAttributesChanged(
+        IHostAttributes.build(new HostAttributes().setHost(hostName).setMode(mode))));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
new file mode 100644
index 0000000..af3f0e3
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.events;
+
+import java.util.logging.Logger;
+
+import com.google.common.eventbus.EventBus;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PubsubEventModuleTest extends EasyMockTest {
+
+  private Logger logger;
+  private Injector injector;
+
+  @Before
+  public void setUp() {
+    logger = createMock(Logger.class);
+    injector = Guice.createInjector(
+        new PubsubEventModule(false, logger),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            PubsubEventModule.bindSchedulingFilterDelegate(binder())
+                .toInstance(createMock(SchedulingFilter.class));
+          }
+        });
+  }
+
+  @Test
+  public void testHandlesDeadEvent() {
+    logger.warning(String.format(PubsubEventModule.DEAD_EVENT_MESSAGE, "hello"));
+
+    control.replay();
+
+    injector.getInstance(EventBus.class).post("hello");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
index 99fa11b..4739909 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
@@ -33,8 +33,8 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -69,10 +69,10 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
     stateManager = createMock(StateManager.class);
 
     Injector injector = Guice.createInjector(
+        new PubsubEventModule(false),
         new AbstractModule() {
           @Override
           protected void configure() {
-            PubsubTestUtil.installPubsub(binder());
             StateModule.bindMaintenanceController(binder());
             bind(Storage.class).toInstance(storageUtil.storage);
             bind(StateManager.class).toInstance(stateManager);
@@ -192,9 +192,6 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
         .andReturn(Optional.of(attributes));
     IHostAttributes updated = IHostAttributes.build(attributes.newBuilder().setMode(mode));
     expect(storageUtil.attributeStore.saveHostAttributes(updated)).andReturn(true);
-    eventSink.post(
-        new PubsubEvent.HostMaintenanceStateChange(
-            new HostStatus().setHost(hostName).setMode(mode)));
   }
 
   private void assertStatus(String host, MaintenanceMode mode, Set<HostStatus> statuses)
{

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/test/java/org/apache/aurora/scheduler/state/PubsubTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/PubsubTestUtil.java b/src/test/java/org/apache/aurora/scheduler/state/PubsubTestUtil.java
index e884209..48b3d88 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/PubsubTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/PubsubTestUtil.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.state;
 
 import java.util.Set;
 
-import com.google.inject.Binder;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.TypeLiteral;
@@ -23,7 +22,6 @@ import com.twitter.common.application.StartupStage;
 import com.twitter.common.base.ExceptionalCommand;
 
 import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
 
 /**
  * A convenience utility for unit tests that which to verify pubsub wiring.
@@ -36,15 +34,6 @@ public final class PubsubTestUtil {
   }
 
   /**
-   * Installs the pubsub system on the given binder.
-   *
-   * @param binder Binder to install pubsub system onto.
-   */
-  public static void installPubsub(Binder binder) {
-    PubsubEventModule.installForTest(binder);
-  }
-
-  /**
    * Starts the pubsub system and gets a handle to the event sink where pubsub events may
be sent.
    *
    * @param injector Injector where the pubsub system was installed.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index c6ff43f..259c6a9 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -27,7 +27,6 @@ import com.google.common.collect.Sets;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
 import com.google.common.testing.TearDown;
-
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 import com.twitter.common.quantity.Time;
@@ -79,6 +78,8 @@ import org.apache.aurora.gen.storage.storageConstants;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.log.Log.Entry;
 import org.apache.aurora.scheduler.log.Log.Position;
@@ -134,6 +135,7 @@ public class LogStorageTest extends EasyMockTest {
   private SnapshotStore<Snapshot> snapshotStore;
   private StorageTestUtil storageUtil;
   private SnapshotDeduplicator snapshotDeduplicator;
+  private EventSink eventSink;
 
   @Before
   public void setUp() {
@@ -160,20 +162,22 @@ public class LogStorageTest extends EasyMockTest {
     schedulingService = createMock(SchedulingService.class);
     snapshotStore = createMock(new Clazz<SnapshotStore<Snapshot>>() { });
     storageUtil = new StorageTestUtil(this);
-
-    logStorage =
-        new LogStorage(logManager,
-            schedulingService,
-            snapshotStore,
-            SNAPSHOT_INTERVAL,
-            storageUtil.storage,
-            storageUtil.schedulerStore,
-            storageUtil.jobStore,
-            storageUtil.taskStore,
-            storageUtil.lockStore,
-            storageUtil.quotaStore,
-            storageUtil.attributeStore,
-            storageUtil.jobUpdateStore);
+    eventSink = createMock(EventSink.class);
+
+    logStorage = new LogStorage(
+        logManager,
+        schedulingService,
+        snapshotStore,
+        SNAPSHOT_INTERVAL,
+        storageUtil.storage,
+        storageUtil.schedulerStore,
+        storageUtil.jobStore,
+        storageUtil.taskStore,
+        storageUtil.lockStore,
+        storageUtil.quotaStore,
+        storageUtil.attributeStore,
+        storageUtil.jobUpdateStore,
+        eventSink);
 
     stream = createMock(Stream.class);
     streamMatcher = LogOpMatcher.matcherFor(stream);
@@ -306,7 +310,7 @@ public class LogStorageTest extends EasyMockTest {
         rewriteTask.getTaskId(),
         ITaskConfig.build(rewriteTask.getTask()))).andReturn(true);
 
-    RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.<String>of("taskId1"));
+    RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.of("taskId1"));
     builder.add(createTransaction(Op.removeTasks(removeTasks)));
     storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds());
 
@@ -841,6 +845,7 @@ public class LogStorageTest extends EasyMockTest {
         expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
 
         expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get())).andReturn(true);
+        eventSink.post(new PubsubEvent.HostAttributesChanged(hostAttributes.get()));
         streamMatcher.expectTransaction(
             Op.saveHostAttributes(new SaveHostAttributes(hostAttributes.get().newBuilder())))
             .andReturn(position);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7ecb718a/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
b/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
index a553c56..9393ad7 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
@@ -22,11 +22,17 @@ import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.storage.Op;
 import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
 import org.apache.aurora.gen.storage.SaveTasks;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.JobStore;
 import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -34,6 +40,7 @@ import org.apache.aurora.scheduler.storage.LockStore;
 import org.apache.aurora.scheduler.storage.QuotaStore;
 import org.apache.aurora.scheduler.storage.SchedulerStore;
 import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.easymock.EasyMock;
 import org.junit.Before;
@@ -41,6 +48,8 @@ import org.junit.Test;
 
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class WriteAheadStorageTest extends EasyMockTest {
 
@@ -53,6 +62,7 @@ public class WriteAheadStorageTest extends EasyMockTest {
   private AttributeStore.Mutable attributeStore;
   private JobUpdateStore.Mutable jobUpdateStore;
   private Logger log;
+  private EventSink eventSink;
   private WriteAheadStorage storage;
 
   @Before
@@ -66,6 +76,7 @@ public class WriteAheadStorageTest extends EasyMockTest {
     attributeStore = createMock(AttributeStore.Mutable.class);
     jobUpdateStore = createMock(JobUpdateStore.Mutable.class);
     log = createMock(Logger.class);
+    eventSink = createMock(EventSink.class);
 
     storage = new WriteAheadStorage(
         transactionManager,
@@ -76,7 +87,8 @@ public class WriteAheadStorageTest extends EasyMockTest {
         quotaStore,
         attributeStore,
         jobUpdateStore,
-        log);
+        log,
+        eventSink);
   }
 
   private void expectOp(Op op) {
@@ -128,6 +140,29 @@ public class WriteAheadStorageTest extends EasyMockTest {
     assertEquals(mutated, storage.mutateTasks(query, mutator));
   }
 
+  @Test
+  public void testSaveHostAttributes() {
+    IHostAttributes attributes = IHostAttributes.build(
+        new HostAttributes()
+            .setHost("a")
+            .setMode(MaintenanceMode.DRAINING)
+            .setAttributes(ImmutableSet.of(
+                new Attribute().setName("b").setValues(ImmutableSet.of("1", "2")))));
+
+    expect(attributeStore.saveHostAttributes(attributes)).andReturn(true);
+    expectOp(Op.saveHostAttributes(
+        new SaveHostAttributes().setHostAttributes(attributes.newBuilder())));
+    eventSink.post(new PubsubEvent.HostAttributesChanged(attributes));
+
+    expect(attributeStore.saveHostAttributes(attributes)).andReturn(false);
+
+    control.replay();
+
+    assertTrue(storage.saveHostAttributes(attributes));
+
+    assertFalse(storage.saveHostAttributes(attributes));
+  }
+
   @Test(expected = UnsupportedOperationException.class)
   public void testDeleteAllTasks() {
     control.replay();


Mime
View raw message