aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject incubator-aurora git commit: Setting the max thread limit on AsyncEventBus.
Date Thu, 13 Nov 2014 20:26:22 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master de5a9b736 -> 1e16eef78


Setting the max thread limit on AsyncEventBus.

Bugs closed: AURORA-926

Reviewed at https://reviews.apache.org/r/27947/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/1e16eef7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/1e16eef7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/1e16eef7

Branch: refs/heads/master
Commit: 1e16eef78809ab5604f2ea1d49707f7836f8c24c
Parents: de5a9b7
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Thu Nov 13 12:26:07 2014 -0800
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Thu Nov 13 12:26:07 2014 -0800

----------------------------------------------------------------------
 .../scheduler/events/PubsubEventModule.java     | 82 ++++++++++++++++++--
 .../scheduler/events/PubsubEventModuleTest.java | 53 ++++++++++---
 2 files changed, 118 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1e16eef7/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
index 24cd750..9e8ba65 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
@@ -13,15 +13,21 @@
  */
 package org.apache.aurora.scheduler.events;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
 import java.util.Set;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
+import javax.inject.Qualifier;
 import javax.inject.Singleton;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
 import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.DeadEvent;
 import com.google.common.eventbus.EventBus;
@@ -30,15 +36,24 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
+import com.google.inject.TypeLiteral;
 import com.google.inject.binder.LinkedBindingBuilder;
 import com.google.inject.multibindings.Multibinder;
 import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.Positive;
 import com.twitter.common.base.Command;
+import com.twitter.common.stats.StatsProvider;
 
 import org.apache.aurora.scheduler.events.NotifyingSchedulingFilter.NotifyDelegate;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -50,6 +65,18 @@ public final class PubsubEventModule extends AbstractModule {
   private final Logger log;
 
   @VisibleForTesting
+  static final String PUBSUB_EXECUTOR_QUEUE_GAUGE = "pubsub_executor_queue_size";
+
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  private @interface PubsubExecutorQueue { }
+
+  @Positive
+  @CmdLine(name = "max_async_event_bus_threads",
+      help = "Maximum number of concurrent threads to allow for the async event processing
bus.")
+  private static final Arg<Integer> MAX_ASYNC_EVENT_BUS_THREADS = Arg.create(4);
+
+  @VisibleForTesting
   PubsubEventModule(boolean async, Logger log) {
     this.log = requireNonNull(log);
     this.async = requireNonNull(async);
@@ -66,21 +93,29 @@ public final class PubsubEventModule extends AbstractModule {
   protected void configure() {
     final Executor executor;
     if (async) {
-      executor = Executors.newCachedThreadPool(
+      LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>();
+      bind(new TypeLiteral<LinkedBlockingQueue<Runnable>>() { })
+          .annotatedWith(PubsubExecutorQueue.class)
+          .toInstance(executorQueue);
+
+      executor = new ThreadPoolExecutor(
+          MAX_ASYNC_EVENT_BUS_THREADS.get(),
+          MAX_ASYNC_EVENT_BUS_THREADS.get(),
+          0L,
+          TimeUnit.MILLISECONDS,
+          executorQueue,
           new ThreadFactoryBuilder()
               .setDaemon(true)
               .setNameFormat("AsyncTaskEvents-%d")
               .build());
+
+      LifecycleModule.bindStartupAction(binder(), RegisterGauges.class);
     } else {
       executor = MoreExecutors.sameThreadExecutor();
     }
 
     final EventBus eventBus = new AsyncEventBus("AsyncTaskEvents", executor);
-    eventBus.register(new Object() {
-      @Subscribe public void logDeadEvent(DeadEvent event) {
-        log.warning(String.format(DEAD_EVENT_MESSAGE, event.getEvent()));
-      }
-    });
+    eventBus.register(new DeadEventHandler());
     bind(EventBus.class).toInstance(eventBus);
 
     EventSink eventSink = new EventSink() {
@@ -96,6 +131,39 @@ public final class PubsubEventModule extends AbstractModule {
     LifecycleModule.bindStartupAction(binder(), RegisterSubscribers.class);
   }
 
+  private class DeadEventHandler {
+    @Subscribe
+    public void logDeadEvent(DeadEvent event) {
+      log.warning(String.format(DEAD_EVENT_MESSAGE, event.getEvent()));
+    }
+  }
+
+  static class RegisterGauges implements Command {
+    private final StatsProvider statsProvider;
+    private final LinkedBlockingQueue<Runnable> pubsubQueue;
+
+    @Inject
+    RegisterGauges(
+        StatsProvider statsProvider,
+        @PubsubExecutorQueue LinkedBlockingQueue<Runnable> pubsubQueue) {
+
+      this.statsProvider = requireNonNull(statsProvider);
+      this.pubsubQueue = requireNonNull(pubsubQueue);
+    }
+
+    @Override
+    public void execute() throws RuntimeException {
+      statsProvider.makeGauge(
+          PUBSUB_EXECUTOR_QUEUE_GAUGE,
+          new Supplier<Integer>() {
+            @Override
+            public Integer get() {
+              return pubsubQueue.size();
+            }
+          });
+    }
+  }
+
   static class RegisterSubscribers implements Command {
     private final EventBus eventBus;
     private final Set<EventSubscriber> subscribers;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1e16eef7/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
index af3f0e3..0e0fabb 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
@@ -15,41 +15,74 @@ package org.apache.aurora.scheduler.events;
 
 import java.util.logging.Logger;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.eventbus.EventBus;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+import com.google.inject.Key;
+
+import com.twitter.common.application.StartupStage;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 public class PubsubEventModuleTest extends EasyMockTest {
 
+  private FakeStatsProvider statsProvider;
   private Logger logger;
-  private Injector injector;
 
   @Before
   public void setUp() {
+    statsProvider = new FakeStatsProvider();
     logger = createMock(Logger.class);
-    injector = Guice.createInjector(
-        new PubsubEventModule(false, logger),
-        new AbstractModule() {
-          @Override
-          protected void configure() {
-            PubsubEventModule.bindSchedulingFilterDelegate(binder())
-                .toInstance(createMock(SchedulingFilter.class));
-          }
-        });
   }
 
   @Test
   public void testHandlesDeadEvent() {
     logger.warning(String.format(PubsubEventModule.DEAD_EVENT_MESSAGE, "hello"));
+    Injector injector = getInjector(false);
 
     control.replay();
 
     injector.getInstance(EventBus.class).post("hello");
   }
+
+  @Test
+  public void testPubsubQueueGauge() throws Exception {
+    Injector injector = getInjector(true);
+
+    control.replay();
+
+    injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
+    assertEquals(
+        ImmutableMap.of(PubsubEventModule.PUBSUB_EXECUTOR_QUEUE_GAUGE, 0),
+        statsProvider.getAllValues()
+    );
+  }
+
+  public Injector getInjector(boolean isAsync) {
+    return Guice.createInjector(
+        new LifecycleModule(),
+        new PubsubEventModule(isAsync, logger),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(Thread.UncaughtExceptionHandler.class)
+                .toInstance(createMock(Thread.UncaughtExceptionHandler.class));
+
+            bind(StatsProvider.class).toInstance(statsProvider);
+            PubsubEventModule.bindSchedulingFilterDelegate(binder())
+                .toInstance(createMock(SchedulingFilter.class));
+          }
+        });
+  }
 }


Mime
View raw message