aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dmclaugh...@apache.org
Subject aurora git commit: Refactor veto logic to use direct method calls as opposed to pubsub events.
Date Wed, 25 Oct 2017 17:22:05 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 96c834a24 -> 418813cf1


Refactor veto logic to use direct method calls as opposed to pubsub events.

SchedulingFilterNotifier currently publishes veto events to be consumed by various metadata
classes (NearestFit and TaskVars). These veto events cause a lot object allocations/async
tasks. We can reduce the number of objects made by directly calling methods and not using
pubsub events.

Reviewed at https://reviews.apache.org/r/63236/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/418813cf
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/418813cf
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/418813cf

Branch: refs/heads/master
Commit: 418813cf128503836429bbf2bdf0b90221e97e1c
Parents: 96c834a
Author: Jordan Ly <jordan.ly8@gmail.com>
Authored: Wed Oct 25 10:21:55 2017 -0700
Committer: David McLaughlin <david@dmclaughlin.com>
Committed: Wed Oct 25 10:21:55 2017 -0700

----------------------------------------------------------------------
 .../org/apache/aurora/scheduler/TaskVars.java   | 11 +++--
 .../apache/aurora/scheduler/app/AppModule.java  | 10 ++++-
 .../events/NotifyingSchedulingFilter.java       | 22 ++++++---
 .../aurora/scheduler/events/PubsubEvent.java    | 47 --------------------
 .../scheduler/events/PubsubEventModule.java     | 16 -------
 .../aurora/scheduler/metadata/NearestFit.java   | 19 ++++----
 .../apache/aurora/scheduler/TaskVarsTest.java   | 17 +++----
 .../events/NotifyingSchedulingFilterTest.java   | 17 ++++---
 .../scheduler/events/PubsubEventModuleTest.java |  4 --
 .../aurora/scheduler/http/PendingTasksTest.java |  6 +--
 .../scheduler/metadata/NearestFitTest.java      |  5 +--
 11 files changed, 57 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/418813cf/src/main/java/org/apache/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
index 676dfd9..3911626 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.inject.Inject;
@@ -40,7 +41,6 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType;
@@ -56,7 +56,7 @@ import static java.util.Objects.requireNonNull;
 /**
  * A container that tracks and exports stat counters for tasks.
  */
-class TaskVars extends AbstractIdleService implements EventSubscriber {
+public class TaskVars extends AbstractIdleService implements EventSubscriber {
   private static final Logger LOG = LoggerFactory.getLogger(TaskVars.class);
   private static final ImmutableSet<ScheduleStatus> TRACKED_JOB_STATES =
       ImmutableSet.of(ScheduleStatus.LOST, ScheduleStatus.FAILED);
@@ -221,13 +221,12 @@ class TaskVars extends AbstractIdleService implements EventSubscriber
{
     }
   }
 
-  @Subscribe
-  public void taskVetoed(Vetoed event) {
-    VetoGroup vetoGroup = Veto.identifyGroup(event.getVetoes());
+  public void taskVetoed(Set<Veto> vetoes) {
+    VetoGroup vetoGroup = Veto.identifyGroup(vetoes);
     if (vetoGroup != VetoGroup.EMPTY) {
       counters.getUnchecked(VETO_GROUPS_TO_COUNTERS.get(vetoGroup)).increment();
     }
-    for (Veto veto : event.getVetoes()) {
+    for (Veto veto : vetoes) {
       counters.getUnchecked(VETO_TYPE_TO_COUNTERS.get(veto.getVetoType())).increment();
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/418813cf/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 40579db..3204cca 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -40,7 +40,9 @@ import org.apache.aurora.scheduler.config.CliOptions;
 import org.apache.aurora.scheduler.config.validators.PositiveNumber;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
+import org.apache.aurora.scheduler.events.NotifyingSchedulingFilter;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.http.JettyServerModule;
 import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
@@ -160,11 +162,15 @@ public class AppModule extends AbstractModule {
     GuiceUtils.bindExceptionTrap(binder(), Scheduler.class);
 
     bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
-    install(new PubsubEventModule());
     // Filter layering: notifier filter -> base impl
-    PubsubEventModule.bindSchedulingFilterDelegate(binder()).to(SchedulingFilterImpl.class);
+    bind(SchedulingFilter.class).to(NotifyingSchedulingFilter.class);
+    bind(NotifyingSchedulingFilter.class).in(Singleton.class);
+    bind(SchedulingFilter.class)
+        .annotatedWith(NotifyingSchedulingFilter.NotifyDelegate.class)
+        .to(SchedulingFilterImpl.class);
     bind(SchedulingFilterImpl.class).in(Singleton.class);
 
+    install(new PubsubEventModule());
     install(new AsyncModule(options.async));
     install(new OffersModule(options));
     install(new PruningModule(options.pruning));

http://git-wip-us.apache.org/repos/asf/aurora/blob/418813cf/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index f6c759f..41f85e9 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -20,9 +20,11 @@ import java.util.Set;
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.scheduler.TaskVars;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.metadata.NearestFit;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -31,9 +33,10 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
 /**
- * A decorating scheduling filter that sends an event when a scheduling assignment is vetoed.
+ * A decorating scheduling filter that notifies metadata classes when a scheduling assignment
is
+ * vetoed.
  */
-class NotifyingSchedulingFilter implements SchedulingFilter {
+public class NotifyingSchedulingFilter implements SchedulingFilter {
 
   /**
    * Binding annotation that the underlying {@link SchedulingFilter} must be bound with.
@@ -43,22 +46,27 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
   public @interface NotifyDelegate { }
 
   private final SchedulingFilter delegate;
-  private final EventSink eventSink;
+  private final NearestFit nearestFit;
+  private final TaskVars taskVars;
 
   @Inject
   NotifyingSchedulingFilter(
       @NotifyDelegate SchedulingFilter delegate,
-      EventSink eventSink) {
+      NearestFit nearestFit,
+      TaskVars taskVars) {
 
     this.delegate = requireNonNull(delegate);
-    this.eventSink = requireNonNull(eventSink);
+    this.nearestFit = requireNonNull(nearestFit);
+    this.taskVars = requireNonNull(taskVars);
   }
 
+  @Timed("notifying_scheduling_filter")
   @Override
   public Set<Veto> filter(UnusedResource resource, ResourceRequest request) {
     Set<Veto> vetoes = delegate.filter(resource, request);
     if (!vetoes.isEmpty()) {
-      eventSink.post(new Vetoed(TaskGroupKey.from(request.getTask()), vetoes));
+      nearestFit.vetoed(TaskGroupKey.from(request.getTask()), vetoes);
+      taskVars.taskVetoed(vetoes);
     }
 
     return vetoes;

http://git-wip-us.apache.org/repos/asf/aurora/blob/418813cf/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
index 0637eb7..73fcdd0 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
@@ -21,9 +21,7 @@ import com.google.common.base.Optional;
 import com.google.gson.Gson;
 
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.v1.Protos;
@@ -202,51 +200,6 @@ public interface PubsubEvent {
     }
   }
 
-  /**
-   * Event sent when a scheduling assignment was vetoed.
-   */
-  class Vetoed implements PubsubEvent {
-    private final TaskGroupKey groupKey;
-    private final Set<Veto> vetoes;
-
-    public Vetoed(TaskGroupKey groupKey, Set<Veto> vetoes) {
-      this.groupKey = requireNonNull(groupKey);
-      this.vetoes = requireNonNull(vetoes);
-    }
-
-    public TaskGroupKey getGroupKey() {
-      return groupKey;
-    }
-
-    public Set<Veto> getVetoes() {
-      return vetoes;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof Vetoed)) {
-        return false;
-      }
-
-      Vetoed other = (Vetoed) o;
-      return Objects.equals(groupKey, other.groupKey)
-          && Objects.equals(vetoes, other.vetoes);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(groupKey, vetoes);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("groupKey", groupKey)
-          .add("vetoes", vetoes)
-          .toString();
-    }
-  }
-
   class DriverRegistered implements PubsubEvent {
     @Override
     public boolean equals(Object o) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/418813cf/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
index 0ca7e23..6758f8c 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
@@ -29,15 +29,12 @@ import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
 import com.google.inject.Provides;
-import com.google.inject.binder.LinkedBindingBuilder;
 import com.google.inject.multibindings.Multibinder;
 
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.events.NotifyingSchedulingFilter.NotifyDelegate;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -130,19 +127,6 @@ public final class PubsubEventModule extends AbstractModule {
   }
 
   /**
-   * Gets a binding builder that must be used to wire up the scheduling filter implementation
-   * that backs the delegating scheduling filter that fires pubsub events.
-   *
-   * @param binder Binder to create a binding against.
-   * @return A linked binding builder that may be used to wire up the scheduling filter.
-   */
-  public static LinkedBindingBuilder<SchedulingFilter> bindSchedulingFilterDelegate(Binder
binder) {
-    binder.bind(SchedulingFilter.class).to(NotifyingSchedulingFilter.class);
-    binder.bind(NotifyingSchedulingFilter.class).in(Singleton.class);
-    return binder.bind(SchedulingFilter.class).annotatedWith(NotifyDelegate.class);
-  }
-
-  /**
    * Binds a task event module.
    *
    * @param binder Binder to bind against.

http://git-wip-us.apache.org/repos/asf/aurora/blob/418813cf/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
index 44cac6f..b077864 100644
--- a/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
+++ b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
@@ -16,7 +16,6 @@ package org.apache.aurora.scheduler.metadata;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -42,7 +41,6 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.scheduling.TaskGroup;
 
@@ -83,7 +81,7 @@ public class NearestFit implements EventSubscriber {
    * @return The nearest fit vetoes for the given task.  This will return an empty set if
    *         no vetoes have been recorded for the task.
    */
-  public synchronized ImmutableSet<Veto> getNearestFit(TaskGroupKey groupKey) {
+  public ImmutableSet<Veto> getNearestFit(TaskGroupKey groupKey) {
     Fit fit = fitByGroupKey.getIfPresent(groupKey);
     return (fit == null) ? NO_VETO : fit.vetoes;
   }
@@ -94,7 +92,7 @@ public class NearestFit implements EventSubscriber {
    * @param deletedEvent Task deleted event.
    */
   @Subscribe
-  public synchronized void remove(TasksDeleted deletedEvent) {
+  public void remove(TasksDeleted deletedEvent) {
     fitByGroupKey.invalidateAll(Iterables.transform(deletedEvent.getTasks(), Functions.compose(
         TaskGroupKey::from,
         Tasks::getConfig)));
@@ -107,21 +105,20 @@ public class NearestFit implements EventSubscriber {
    * @param event Task state change.
    */
   @Subscribe
-  public synchronized void stateChanged(TaskStateChange event) {
+  public void stateChanged(TaskStateChange event) {
     if (event.isTransition() && event.getOldState().get() == ScheduleStatus.PENDING)
{
       fitByGroupKey.invalidate(TaskGroupKey.from(event.getTask().getAssignedTask().getTask()));
     }
   }
 
   /**
-   * Records a task veto event.
+   * Records vetoes for a {@link TaskGroupKey}.
    *
-   * @param vetoEvent Veto event.
+   * @param taskGroupKey The TaskGroupKey that is being vetoed.
+   * @param vetoes The vetoes.
    */
-  @Subscribe
-  public synchronized void vetoed(Vetoed vetoEvent) {
-    Objects.requireNonNull(vetoEvent);
-    fitByGroupKey.getUnchecked(vetoEvent.getGroupKey()).maybeUpdate(vetoEvent.getVetoes());
+  public void vetoed(TaskGroupKey taskGroupKey, Set<Veto> vetoes) {
+    fitByGroupKey.getUnchecked(taskGroupKey).maybeUpdate(vetoes);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/418813cf/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
index a4741c3..b1d9f8a 100644
--- a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
@@ -28,10 +28,8 @@ import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
@@ -123,10 +121,8 @@ public class TaskVarsTest extends EasyMockTest {
         task.getStatus()));
   }
 
-  private void applyVeto(IScheduledTask task, Veto... vetoes) {
-    vars.taskVetoed(new PubsubEvent.Vetoed(
-        TaskGroupKey.from(task.getAssignedTask().getTask()),
-        ImmutableSet.copyOf(vetoes)));
+  private void applyVeto(Veto... vetoes) {
+    vars.taskVetoed(ImmutableSet.copyOf(vetoes));
   }
 
   private void schedulerActivated(IScheduledTask... initialTasks) {
@@ -222,9 +218,7 @@ public class TaskVarsTest extends EasyMockTest {
     replayAndBuild();
     schedulerActivated();
 
-    applyVeto(
-        makeTask(JOB_A, PENDING),
-        Veto.insufficientResources("ram", 500),
+    applyVeto(Veto.insufficientResources("ram", 500),
         Veto.insufficientResources("cpu", 500));
 
     assertEquals(1, getValue(STATIC_COUNTER));
@@ -240,7 +234,7 @@ public class TaskVarsTest extends EasyMockTest {
     replayAndBuild();
     schedulerActivated();
 
-    applyVeto(makeTask(JOB_A, PENDING), Veto.unsatisfiedLimit("constraint"));
+    applyVeto(Veto.unsatisfiedLimit("constraint"));
     assertEquals(1, getValue(DYNAMIC_COUNTER));
     assertEquals(1, getValue(LIMIT_NOT_SATISFIED_COUNTER));
   }
@@ -255,8 +249,7 @@ public class TaskVarsTest extends EasyMockTest {
     replayAndBuild();
     schedulerActivated();
 
-    applyVeto(makeTask(JOB_A, PENDING),
-        Veto.unsatisfiedLimit("constraint"),
+    applyVeto(Veto.unsatisfiedLimit("constraint"),
         Veto.insufficientResources("ram", 500));
 
     assertEquals(1, getValue(MIXED_COUNTER));

http://git-wip-us.apache.org/repos/asf/aurora/blob/418813cf/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index bf9c2b4..64d7a44 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -21,13 +21,14 @@ import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.TaskVars;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.metadata.NearestFit;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -59,21 +60,25 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
   private static final Veto VETO_2 = Veto.insufficientResources("ram", 2);
 
   private SchedulingFilter filter;
-  private EventSink eventSink;
+  private NearestFit nearestFit;
+  private TaskVars taskVars;
   private SchedulingFilter delegate;
 
   @Before
   public void setUp() {
     delegate = createMock(SchedulingFilter.class);
-    eventSink = createMock(EventSink.class);
-    filter = new NotifyingSchedulingFilter(delegate, eventSink);
+    nearestFit = createMock(NearestFit.class);
+    taskVars = createMock(TaskVars.class);
+
+    filter = new NotifyingSchedulingFilter(delegate, nearestFit, taskVars);
   }
 
   @Test
-  public void testEvents() {
+  public void testNotifies() {
     Set<Veto> vetoes = ImmutableSet.of(VETO_1, VETO_2);
     expect(delegate.filter(RESOURCE, REQUEST)).andReturn(vetoes);
-    eventSink.post(new Vetoed(GROUP_KEY, vetoes));
+    nearestFit.vetoed(GROUP_KEY, vetoes);
+    taskVars.taskVetoed(vetoes);
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/418813cf/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
index 90c0b86..f656b27 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
@@ -32,7 +32,6 @@ import org.apache.aurora.scheduler.AppStartup;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.app.LifecycleModule;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.EasyMock;
 import org.junit.Before;
@@ -47,14 +46,12 @@ public class PubsubEventModuleTest extends EasyMockTest {
   private FakeStatsProvider statsProvider;
   private Logger logger;
   private UncaughtExceptionHandler exceptionHandler;
-  private SchedulingFilter schedulingFilter;
 
   @Before
   public void setUp() {
     statsProvider = new FakeStatsProvider();
     logger = createMock(Logger.class);
     exceptionHandler = createMock(UncaughtExceptionHandler.class);
-    schedulingFilter = createMock(SchedulingFilter.class);
   }
 
   @Test
@@ -109,7 +106,6 @@ public class PubsubEventModuleTest extends EasyMockTest {
             bind(UncaughtExceptionHandler.class).toInstance(exceptionHandler);
 
             bind(StatsProvider.class).toInstance(statsProvider);
-            PubsubEventModule.bindSchedulingFilterDelegate(binder()).toInstance(schedulingFilter);
             for (Module module : additionalModules) {
               install(module);
             }

http://git-wip-us.apache.org/repos/asf/aurora/blob/418813cf/src/test/java/org/apache/aurora/scheduler/http/PendingTasksTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/PendingTasksTest.java b/src/test/java/org/apache/aurora/scheduler/http/PendingTasksTest.java
index 96bdded..a69bf4a 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/PendingTasksTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/PendingTasksTest.java
@@ -152,16 +152,16 @@ public class PendingTasksTest extends EasyMockTest {
     ImmutableSet<Veto> vetoes = ImmutableSet.<Veto>builder()
         .add(Veto.insufficientResources("CPU", 1))
         .add(Veto.insufficientResources("RAM", 1)).build();
-    nearestFit.vetoed(new PubsubEvent.Vetoed(taskGroupKey0, vetoes));
+    nearestFit.vetoed(taskGroupKey0, vetoes);
     // Creating vetoes for CPU and DISK, corresponding to task1.
     ImmutableSet<Veto> vetoes1 = ImmutableSet.<Veto>builder()
         .add(Veto.insufficientResources("CPU", 1))
         .add(Veto.insufficientResources("DISK", 1)).build();
-    nearestFit.vetoed(new PubsubEvent.Vetoed(taskGroupKey1, vetoes1));
+    nearestFit.vetoed(taskGroupKey1, vetoes1);
       // Creating vetoes for CPU, corresponding to task2.
     ImmutableSet<Veto> vetoes2 = ImmutableSet.<Veto>builder()
         .add(Veto.insufficientResources("CPU", 1)).build();
-    nearestFit.vetoed(new PubsubEvent.Vetoed(taskGroupKey2, vetoes2));
+    nearestFit.vetoed(taskGroupKey2, vetoes2);
     replay(pendingTaskGroups);
 
     // Testing.

http://git-wip-us.apache.org/repos/asf/aurora/blob/418813cf/src/test/java/org/apache/aurora/scheduler/metadata/NearestFitTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/metadata/NearestFitTest.java b/src/test/java/org/apache/aurora/scheduler/metadata/NearestFitTest.java
index e170d62..437dabe 100644
--- a/src/test/java/org/apache/aurora/scheduler/metadata/NearestFitTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/metadata/NearestFitTest.java
@@ -34,7 +34,6 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.http.TestUtils;
 import org.apache.aurora.scheduler.scheduling.TaskGroup;
@@ -139,7 +138,7 @@ public class NearestFitTest {
     pendingTaskGroups.add(taskGroup);
 
     // Creating vetoes for CPU and RAM.
-    nearest.vetoed(new Vetoed(taskGroupKey, vetoes(SEVERITY_4_CPU, SEVERITY_4_RAM)));
+    nearest.vetoed(taskGroupKey, vetoes(SEVERITY_4_CPU, SEVERITY_4_RAM));
 
     // Testing.
     Map<TaskGroupKey, List<String>> mimicPendingReasons = new LinkedHashMap<>();
@@ -155,7 +154,7 @@ public class NearestFitTest {
   }
 
   private void vetoed(Veto... vetoes) {
-    nearest.vetoed(new Vetoed(GROUP_KEY, ImmutableSet.copyOf(vetoes)));
+    nearest.vetoed(GROUP_KEY, ImmutableSet.copyOf(vetoes));
   }
 
   private void assertNearest(Veto... vetoes) {


Mime
View raw message