aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject aurora git commit: Add a stat that tracks uncaught exceptions in pubsub event handlers.
Date Thu, 09 Jul 2015 18:22:57 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 8efcd0698 -> b86b293e6


Add a stat that tracks uncaught exceptions in pubsub event handlers.

Reviewed at https://reviews.apache.org/r/36329/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/b86b293e
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/b86b293e
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/b86b293e

Branch: refs/heads/master
Commit: b86b293e626b1346f5adff9e6d5dd638928014b1
Parents: 8efcd06
Author: Bill Farner <wfarner@apache.org>
Authored: Thu Jul 9 11:22:39 2015 -0700
Committer: Bill Farner <wfarner@apache.org>
Committed: Thu Jul 9 11:22:39 2015 -0700

----------------------------------------------------------------------
 .../scheduler/events/PubsubEventModule.java     | 94 ++++++++------------
 .../aurora/scheduler/async/KillRetryTest.java   |  5 +-
 .../scheduler/async/TaskSchedulerImplTest.java  |  3 +-
 .../scheduler/events/PubsubEventModuleTest.java | 46 +++++++++-
 .../state/MaintenanceControllerImplTest.java    |  3 +
 5 files changed, 88 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/b86b293e/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
index 82d479e..c85979d 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
@@ -13,35 +13,33 @@
  */
 package org.apache.aurora.scheduler.events;
 
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
-import javax.inject.Qualifier;
 import javax.inject.Singleton;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Supplier;
 import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.DeadEvent;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
+import com.google.common.eventbus.SubscriberExceptionContext;
+import com.google.common.eventbus.SubscriberExceptionHandler;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
-import com.google.inject.TypeLiteral;
+import com.google.inject.Provides;
 import com.google.inject.binder.LinkedBindingBuilder;
 import com.google.inject.multibindings.Multibinder;
-import com.twitter.common.application.modules.LifecycleModule;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
 import com.twitter.common.args.constraints.Positive;
-import com.twitter.common.base.Command;
 import com.twitter.common.stats.StatsProvider;
 
 import org.apache.aurora.scheduler.SchedulerServicesModule;
@@ -50,10 +48,6 @@ import org.apache.aurora.scheduler.events.NotifyingSchedulingFilter.NotifyDelega
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -67,9 +61,8 @@ public final class PubsubEventModule extends AbstractModule {
   @VisibleForTesting
   static final String PUBSUB_EXECUTOR_QUEUE_GAUGE = "pubsub_executor_queue_size";
 
-  @Qualifier
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  private @interface PubsubExecutorQueue { }
+  @VisibleForTesting
+  static final String EXCEPTIONS_STAT = "event_bus_exceptions";
 
   @Positive
   @CmdLine(name = "max_async_event_bus_threads",
@@ -91,12 +84,19 @@ public final class PubsubEventModule extends AbstractModule {
 
   @Override
   protected void configure() {
-    final Executor executor;
+    // Ensure at least an empty binding is present.
+    getSubscriberBinder(binder());
+    // TODO(ksweeney): Would this be better as a scheduler active service?
+    SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterSubscribers.class);
+  }
+
+  @Provides
+  @Singleton
+  EventBus provideEventBus(StatsProvider statsProvider) {
+    Executor executor;
     if (async) {
       LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>();
-      bind(new TypeLiteral<LinkedBlockingQueue<Runnable>>() { })
-          .annotatedWith(PubsubExecutorQueue.class)
-          .toInstance(executorQueue);
+      statsProvider.makeGauge(PUBSUB_EXECUTOR_QUEUE_GAUGE, executorQueue::size);
 
       executor = AsyncUtil.loggingExecutor(
           MAX_ASYNC_EVENT_BUS_THREADS.get(),
@@ -104,28 +104,38 @@ public final class PubsubEventModule extends AbstractModule {
           executorQueue,
           "AsyncTaskEvents-%d",
           log);
-
-      LifecycleModule.bindStartupAction(binder(), RegisterGauges.class);
     } else {
       executor = MoreExecutors.sameThreadExecutor();
     }
 
-    final EventBus eventBus = new AsyncEventBus("AsyncTaskEvents", executor);
+    final AtomicLong subscriberExceptions = statsProvider.makeCounter(EXCEPTIONS_STAT);
+    EventBus eventBus = new AsyncEventBus(
+        executor,
+        new SubscriberExceptionHandler() {
+          @Override
+          public void handleException(Throwable exception, SubscriberExceptionContext context)
{
+            subscriberExceptions.incrementAndGet();
+            log.log(
+                Level.SEVERE,
+                "Failed to dispatch event to " + context.getSubscriberMethod() + ": " + exception,
+                exception);
+          }
+        }
+    );
+
     eventBus.register(new DeadEventHandler());
-    bind(EventBus.class).toInstance(eventBus);
+    return eventBus;
+  }
 
-    EventSink eventSink = new EventSink() {
+  @Provides
+  @Singleton
+  EventSink provideEventSink(EventBus eventBus) {
+    return new EventSink() {
       @Override
       public void post(PubsubEvent event) {
         eventBus.post(event);
       }
     };
-    bind(EventSink.class).toInstance(eventSink);
-
-    // Ensure at least an empty binding is present.
-    getSubscriberBinder(binder());
-    // TODO(ksweeney): Would this be better as a scheduler active service?
-    SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterSubscribers.class);
   }
 
   private class DeadEventHandler {
@@ -135,32 +145,6 @@ public final class PubsubEventModule extends AbstractModule {
     }
   }
 
-  static class RegisterGauges implements Command {
-    private final StatsProvider statsProvider;
-    private final LinkedBlockingQueue<Runnable> pubsubQueue;
-
-    @Inject
-    RegisterGauges(
-        StatsProvider statsProvider,
-        @PubsubExecutorQueue LinkedBlockingQueue<Runnable> pubsubQueue) {
-
-      this.statsProvider = requireNonNull(statsProvider);
-      this.pubsubQueue = requireNonNull(pubsubQueue);
-    }
-
-    @Override
-    public void execute() throws RuntimeException {
-      statsProvider.makeGauge(
-          PUBSUB_EXECUTOR_QUEUE_GAUGE,
-          new Supplier<Integer>() {
-            @Override
-            public Integer get() {
-              return pubsubQueue.size();
-            }
-          });
-    }
-  }
-
   static class RegisterSubscribers extends AbstractIdleService {
     private final EventBus eventBus;
     private final Set<EventSubscriber> subscribers;

http://git-wip-us.apache.org/repos/asf/aurora/blob/b86b293e/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
index 0faee92..a295fe8 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
@@ -18,7 +18,6 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import javax.inject.Singleton;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.eventbus.EventBus;
 import com.google.common.testing.TearDown;
 import com.google.inject.AbstractModule;
@@ -139,7 +138,7 @@ public class KillRetryTest extends EasyMockTest {
     clock.advance(Amount.of(100L, Time.MILLISECONDS));
     clock.advance(Amount.of(1000L, Time.MILLISECONDS));
     clock.advance(Amount.of(10000L, Time.MILLISECONDS));
-    assertEquals(ImmutableMap.of(KillRetry.RETRIES_COUNTER, 2L), statsProvider.getAllValues());
+    assertEquals(2L, statsProvider.getLongValue(KillRetry.RETRIES_COUNTER));
   }
 
   @Test
@@ -153,6 +152,6 @@ public class KillRetryTest extends EasyMockTest {
 
     moveToKilling(taskId);
     clock.advance(Amount.of(100L, Time.MILLISECONDS));
-    assertEquals(ImmutableMap.of(KillRetry.RETRIES_COUNTER, 0L), statsProvider.getAllValues());
+    assertEquals(0L, statsProvider.getLongValue(KillRetry.RETRIES_COUNTER));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/b86b293e/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 9afd7df..45adb2e 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -52,6 +52,7 @@ import org.apache.aurora.scheduler.storage.db.DbUtil;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.apache.mesos.Protos.TaskInfo;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -121,7 +122,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
             bind(StateManager.class).toInstance(stateManager);
             bind(TaskAssigner.class).toInstance(assigner);
             bind(Clock.class).toInstance(createMock(Clock.class));
-            bind(StatsProvider.class).toInstance(createMock(StatsProvider.class));
+            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
             bind(Storage.class).toInstance(storageImpl);
             PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class);
           }

http://git-wip-us.apache.org/repos/asf/aurora/blob/b86b293e/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
index c6295e5..4c189c1 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
@@ -13,25 +13,31 @@
  */
 package org.apache.aurora.scheduler.events;
 
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Key;
+import com.google.inject.Module;
 import com.twitter.common.application.StartupStage;
 import com.twitter.common.application.modules.LifecycleModule;
 import com.twitter.common.base.ExceptionalCommand;
 import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
+import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.eq;
 import static org.junit.Assert.assertEquals;
 
 public class PubsubEventModuleTest extends EasyMockTest {
@@ -63,15 +69,44 @@ public class PubsubEventModuleTest extends EasyMockTest {
 
     injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
     assertEquals(
-        ImmutableMap.of(PubsubEventModule.PUBSUB_EXECUTOR_QUEUE_GAUGE, 0),
-        statsProvider.getAllValues()
+        0L,
+        statsProvider.getLongValue(PubsubEventModule.PUBSUB_EXECUTOR_QUEUE_GAUGE)
     );
   }
 
-  public Injector getInjector(boolean isAsync) {
+  @Test
+  public void testPubsubExceptionTracking() throws Exception {
+    Injector injector = getInjector(
+        false,
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            PubsubEventModule.bindSubscriber(binder(), ThrowingSubscriber.class);
+          }
+        });
+
+    logger.log(eq(Level.SEVERE), anyString(), EasyMock.<Throwable>anyObject());
+
+    control.replay();
+
+    injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
+    assertEquals(0L, statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT));
+    injector.getInstance(EventBus.class).post("hello");
+    assertEquals(1L, statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT));
+  }
+
+  static class ThrowingSubscriber implements PubsubEvent.EventSubscriber {
+    @Subscribe
+    public void receiveString(String value) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public Injector getInjector(boolean isAsync, Module... additionalModules) {
     return Guice.createInjector(
         new LifecycleModule(),
         new PubsubEventModule(isAsync, logger),
+        new SchedulerServicesModule(),
         new AbstractModule() {
           @Override
           protected void configure() {
@@ -81,6 +116,9 @@ public class PubsubEventModuleTest extends EasyMockTest {
             bind(StatsProvider.class).toInstance(statsProvider);
             PubsubEventModule.bindSchedulingFilterDelegate(binder())
                 .toInstance(createMock(SchedulingFilter.class));
+            for (Module module : additionalModules) {
+              install(module);
+            }
           }
         });
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/b86b293e/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
index cd85b80..ba2edd8 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
@@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.gen.AssignedTask;
@@ -39,6 +40,7 @@ import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -76,6 +78,7 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
             StateModule.bindMaintenanceController(binder());
             bind(Storage.class).toInstance(storageUtil.storage);
             bind(StateManager.class).toInstance(stateManager);
+            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
           }
         });
     maintenance = injector.getInstance(MaintenanceController.class);


Mime
View raw message