aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject aurora git commit: Use the system-wide AsyncExecutor for pubsub events.
Date Thu, 13 Aug 2015 16:00:17 GMT
Repository: aurora
Updated Branches:
  refs/heads/master da48ad20b -> 887ffd2c4


Use the system-wide AsyncExecutor for pubsub events.

Bugs closed: AURORA-1395

Reviewed at https://reviews.apache.org/r/37215/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/887ffd2c
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/887ffd2c
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/887ffd2c

Branch: refs/heads/master
Commit: 887ffd2c485f1f5bc9639f8b2aec7f8a0626f2c0
Parents: da48ad2
Author: Bill Farner <wfarner@apache.org>
Authored: Thu Aug 13 12:00:02 2015 -0400
Committer: Bill Farner <wfarner@apache.org>
Committed: Thu Aug 13 12:00:02 2015 -0400

----------------------------------------------------------------------
 .../apache/aurora/scheduler/app/AppModule.java  |  2 +-
 .../aurora/scheduler/async/AsyncModule.java     |  2 +
 .../scheduler/events/PubsubEventModule.java     | 43 +++-----------------
 .../aurora/scheduler/app/SchedulerIT.java       |  4 ++
 .../scheduler/events/PubsubEventModuleTest.java | 43 ++++++++------------
 .../scheduler/reconciliation/KillRetryTest.java |  6 ++-
 .../scheduling/TaskSchedulerImplTest.java       |  8 +++-
 .../state/MaintenanceControllerImplTest.java    |  7 +++-
 8 files changed, 49 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 4cc1127..4eee8e3 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -111,8 +111,8 @@ public class AppModule extends AbstractModule {
                 .setThriftAPIVersion(THRIFT_API_VERSION)
                 .setStatsUrlPrefix(statsUrlPrefix)));
 
+    install(new PubsubEventModule());
     // Filter layering: notifier filter -> base impl
-    install(new PubsubEventModule(true));
     PubsubEventModule.bindSchedulingFilterDelegate(binder()).to(SchedulingFilterImpl.class);
     bind(SchedulingFilterImpl.class).in(Singleton.class);
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 8416ea0..a8c6445 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.async;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.logging.Logger;
@@ -83,6 +84,7 @@ public class AsyncModule extends AbstractModule {
     });
     SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterGauges.class);
 
+    bind(Executor.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class);
     bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class);
     bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
index ccecfdb..84c58e1 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.events;
 
 import java.util.Set;
 import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -31,19 +30,15 @@ import com.google.common.eventbus.Subscribe;
 import com.google.common.eventbus.SubscriberExceptionContext;
 import com.google.common.eventbus.SubscriberExceptionHandler;
 import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
 import com.google.inject.Provides;
 import com.google.inject.binder.LinkedBindingBuilder;
 import com.google.inject.multibindings.Multibinder;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.Positive;
 import com.twitter.common.stats.StatsProvider;
 
 import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.events.NotifyingSchedulingFilter.NotifyDelegate;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
@@ -55,29 +50,18 @@ import static java.util.Objects.requireNonNull;
  */
 public final class PubsubEventModule extends AbstractModule {
 
-  private final boolean async;
   private final Logger log;
 
   @VisibleForTesting
-  static final String PUBSUB_EXECUTOR_QUEUE_GAUGE = "pubsub_executor_queue_size";
-
-  @VisibleForTesting
   static final String EXCEPTIONS_STAT = "event_bus_exceptions";
 
-  @Positive
-  @CmdLine(name = "max_async_event_bus_threads",
-      help = "Maximum number of concurrent threads to allow for the async event processing
bus.")
-  private static final Arg<Integer> MAX_ASYNC_EVENT_BUS_THREADS = Arg.create(4);
-
   @VisibleForTesting
-  PubsubEventModule(boolean async, Logger log) {
+  PubsubEventModule(Logger log) {
     this.log = requireNonNull(log);
-    this.async = requireNonNull(async);
   }
 
-  // TODO(wfarner): Remove the async argument and accept an Executor instead.
-  public PubsubEventModule(boolean async) {
-    this(async, Logger.getLogger(PubsubEventModule.class.getName()));
+  public PubsubEventModule() {
+    this(Logger.getLogger(PubsubEventModule.class.getName()));
   }
 
   @VisibleForTesting
@@ -93,22 +77,7 @@ public final class PubsubEventModule extends AbstractModule {
 
   @Provides
   @Singleton
-  EventBus provideEventBus(StatsProvider statsProvider) {
-    Executor executor;
-    if (async) {
-      LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>();
-      statsProvider.makeGauge(PUBSUB_EXECUTOR_QUEUE_GAUGE, executorQueue::size);
-
-      executor = AsyncUtil.loggingExecutor(
-          MAX_ASYNC_EVENT_BUS_THREADS.get(),
-          MAX_ASYNC_EVENT_BUS_THREADS.get(),
-          executorQueue,
-          "AsyncTaskEvents-%d",
-          log);
-    } else {
-      executor = MoreExecutors.sameThreadExecutor();
-    }
-
+  EventBus provideEventBus(@AsyncExecutor Executor executor, StatsProvider statsProvider)
{
     final AtomicLong subscriberExceptions = statsProvider.makeCounter(EXCEPTIONS_STAT);
     EventBus eventBus = new AsyncEventBus(
         executor,
@@ -188,7 +157,7 @@ public final class PubsubEventModule extends AbstractModule {
    * @param binder Binder to bind against.
    */
   public static void bind(Binder binder) {
-    binder.install(new PubsubEventModule(true));
+    binder.install(new PubsubEventModule());
   }
 
   private static Multibinder<EventSubscriber> getSubscriberBinder(Binder binder) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 8d8f8a2..0151dd1 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -77,6 +77,8 @@ import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.Transaction;
 import org.apache.aurora.gen.storage.storageConstants;
 import org.apache.aurora.scheduler.Resources;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.FlushableWorkQueue;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.log.Log.Entry;
@@ -384,6 +386,8 @@ public class SchedulerIT extends BaseZooKeeperTest {
     scheduler.getValue().registered(driver,
         FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
         MasterInfo.getDefaultInstance());
+    // Registration is published on the event bus, which will be gated until a flush.
+    injector.getInstance(Key.get(FlushableWorkQueue.class, AsyncExecutor.class)).flush();
 
     awaitSchedulerReady();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
index 4c189c1..983a831 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
@@ -13,11 +13,14 @@
  */
 package org.apache.aurora.scheduler.events;
 
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.Executor;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -30,6 +33,7 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.EasyMock;
@@ -44,51 +48,39 @@ public class PubsubEventModuleTest extends EasyMockTest {
 
   private FakeStatsProvider statsProvider;
   private Logger logger;
+  private UncaughtExceptionHandler exceptionHandler;
+  private SchedulingFilter schedulingFilter;
 
   @Before
   public void setUp() {
     statsProvider = new FakeStatsProvider();
     logger = createMock(Logger.class);
+    exceptionHandler = createMock(UncaughtExceptionHandler.class);
+    schedulingFilter = createMock(SchedulingFilter.class);
   }
 
   @Test
   public void testHandlesDeadEvent() {
     logger.warning(String.format(PubsubEventModule.DEAD_EVENT_MESSAGE, "hello"));
-    Injector injector = getInjector(false);
 
     control.replay();
 
-    injector.getInstance(EventBus.class).post("hello");
+    getInjector().getInstance(EventBus.class).post("hello");
   }
 
   @Test
-  public void testPubsubQueueGauge() throws Exception {
-    Injector injector = getInjector(true);
+  public void testPubsubExceptionTracking() throws Exception {
+    logger.log(eq(Level.SEVERE), anyString(), EasyMock.<Throwable>anyObject());
 
     control.replay();
 
-    injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
-    assertEquals(
-        0L,
-        statsProvider.getLongValue(PubsubEventModule.PUBSUB_EXECUTOR_QUEUE_GAUGE)
-    );
-  }
-
-  @Test
-  public void testPubsubExceptionTracking() throws Exception {
     Injector injector = getInjector(
-        false,
         new AbstractModule() {
           @Override
           protected void configure() {
             PubsubEventModule.bindSubscriber(binder(), ThrowingSubscriber.class);
           }
         });
-
-    logger.log(eq(Level.SEVERE), anyString(), EasyMock.<Throwable>anyObject());
-
-    control.replay();
-
     injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
     assertEquals(0L, statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT));
     injector.getInstance(EventBus.class).post("hello");
@@ -102,20 +94,21 @@ public class PubsubEventModuleTest extends EasyMockTest {
     }
   }
 
-  public Injector getInjector(boolean isAsync, Module... additionalModules) {
+  public Injector getInjector(Module... additionalModules) {
     return Guice.createInjector(
         new LifecycleModule(),
-        new PubsubEventModule(isAsync, logger),
+        new PubsubEventModule(logger),
         new SchedulerServicesModule(),
         new AbstractModule() {
           @Override
           protected void configure() {
-            bind(Thread.UncaughtExceptionHandler.class)
-                .toInstance(createMock(Thread.UncaughtExceptionHandler.class));
+            bind(Executor.class).annotatedWith(AsyncExecutor.class)
+                .toInstance(MoreExecutors.sameThreadExecutor());
+
+            bind(UncaughtExceptionHandler.class).toInstance(exceptionHandler);
 
             bind(StatsProvider.class).toInstance(statsProvider);
-            PubsubEventModule.bindSchedulingFilterDelegate(binder())
-                .toInstance(createMock(SchedulingFilter.class));
+            PubsubEventModule.bindSchedulingFilterDelegate(binder()).toInstance(schedulingFilter);
             for (Module module : additionalModules) {
               install(module);
             }

http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
index 957cbd0..0962ed9 100644
--- a/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
@@ -14,10 +14,12 @@
 package org.apache.aurora.scheduler.reconciliation;
 
 import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.Executor;
 
 import javax.inject.Singleton;
 
 import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -73,7 +75,7 @@ public class KillRetryTest extends EasyMockTest {
 
     Injector injector = Guice.createInjector(
         new LifecycleModule(),
-        new PubsubEventModule(false),
+        new PubsubEventModule(),
         new AbstractModule() {
           @Override
           protected void configure() {
@@ -86,6 +88,8 @@ public class KillRetryTest extends EasyMockTest {
             bind(StatsProvider.class).toInstance(statsProvider);
             bind(UncaughtExceptionHandler.class)
                 .toInstance(createMock(UncaughtExceptionHandler.class));
+            bind(Executor.class).annotatedWith(AsyncExecutor.class)
+                .toInstance(MoreExecutors.sameThreadExecutor());
           }
         }
     );

http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index 492334b..102e8d3 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -14,11 +14,13 @@
 package org.apache.aurora.scheduler.scheduling;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -28,6 +30,7 @@ import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
@@ -94,10 +97,13 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   private Injector getInjector(final Storage storageImpl) {
     return Guice.createInjector(
-        new PubsubEventModule(false),
+        new PubsubEventModule(),
         new AbstractModule() {
           @Override
           protected void configure() {
+
+            bind(Executor.class).annotatedWith(AsyncExecutor.class)
+                .toInstance(MoreExecutors.sameThreadExecutor());
             bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).toInstance(reservations);
             bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
             bind(Preemptor.class).toInstance(preemptor);

http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
index ba2edd8..abeaa49 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
@@ -14,9 +14,11 @@
 package org.apache.aurora.scheduler.state;
 
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -31,6 +33,7 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
@@ -71,7 +74,7 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
     stateManager = createMock(StateManager.class);
 
     Injector injector = Guice.createInjector(
-        new PubsubEventModule(false),
+        new PubsubEventModule(),
         new AbstractModule() {
           @Override
           protected void configure() {
@@ -79,6 +82,8 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
             bind(Storage.class).toInstance(storageUtil.storage);
             bind(StateManager.class).toInstance(stateManager);
             bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+            bind(Executor.class).annotatedWith(AsyncExecutor.class)
+                .toInstance(MoreExecutors.sameThreadExecutor());
           }
         });
     maintenance = injector.getInstance(MaintenanceController.class);


Mime
View raw message