Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 34E2318775 for ; Thu, 9 Jul 2015 18:22:58 +0000 (UTC) Received: (qmail 94596 invoked by uid 500); 9 Jul 2015 18:22:58 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 94561 invoked by uid 500); 9 Jul 2015 18:22:58 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 94552 invoked by uid 99); 9 Jul 2015 18:22:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jul 2015 18:22:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E9713E6825; Thu, 9 Jul 2015 18:22:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: aurora git commit: Add a stat that tracks uncaught exceptions in pubsub event handlers. Date: Thu, 9 Jul 2015 18:22:57 +0000 (UTC) Repository: aurora Updated Branches: refs/heads/master 8efcd0698 -> b86b293e6 Add a stat that tracks uncaught exceptions in pubsub event handlers. Reviewed at https://reviews.apache.org/r/36329/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/b86b293e Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/b86b293e Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/b86b293e Branch: refs/heads/master Commit: b86b293e626b1346f5adff9e6d5dd638928014b1 Parents: 8efcd06 Author: Bill Farner Authored: Thu Jul 9 11:22:39 2015 -0700 Committer: Bill Farner Committed: Thu Jul 9 11:22:39 2015 -0700 ---------------------------------------------------------------------- .../scheduler/events/PubsubEventModule.java | 94 ++++++++------------ .../aurora/scheduler/async/KillRetryTest.java | 5 +- .../scheduler/async/TaskSchedulerImplTest.java | 3 +- .../scheduler/events/PubsubEventModuleTest.java | 46 +++++++++- .../state/MaintenanceControllerImplTest.java | 3 + 5 files changed, 88 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/b86b293e/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java index 82d479e..c85979d 100644 --- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java +++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java @@ -13,35 +13,33 @@ */ package org.apache.aurora.scheduler.events; -import java.lang.annotation.Retention; -import java.lang.annotation.Target; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; import java.util.logging.Logger; import javax.inject.Inject; -import javax.inject.Qualifier; import javax.inject.Singleton; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Supplier; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.DeadEvent; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; +import com.google.common.eventbus.SubscriberExceptionContext; +import com.google.common.eventbus.SubscriberExceptionHandler; import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.AbstractModule; import com.google.inject.Binder; -import com.google.inject.TypeLiteral; +import com.google.inject.Provides; import com.google.inject.binder.LinkedBindingBuilder; import com.google.inject.multibindings.Multibinder; -import com.twitter.common.application.modules.LifecycleModule; import com.twitter.common.args.Arg; import com.twitter.common.args.CmdLine; import com.twitter.common.args.constraints.Positive; -import com.twitter.common.base.Command; import com.twitter.common.stats.StatsProvider; import org.apache.aurora.scheduler.SchedulerServicesModule; @@ -50,10 +48,6 @@ import org.apache.aurora.scheduler.events.NotifyingSchedulingFilter.NotifyDelega import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.filter.SchedulingFilter; -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; /** @@ -67,9 +61,8 @@ public final class PubsubEventModule extends AbstractModule { @VisibleForTesting static final String PUBSUB_EXECUTOR_QUEUE_GAUGE = "pubsub_executor_queue_size"; - @Qualifier - @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - private @interface PubsubExecutorQueue { } + @VisibleForTesting + static final String EXCEPTIONS_STAT = "event_bus_exceptions"; @Positive @CmdLine(name = "max_async_event_bus_threads", @@ -91,12 +84,19 @@ public final class PubsubEventModule extends AbstractModule { @Override protected void configure() { - final Executor executor; + // Ensure at least an empty binding is present. + getSubscriberBinder(binder()); + // TODO(ksweeney): Would this be better as a scheduler active service? + SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterSubscribers.class); + } + + @Provides + @Singleton + EventBus provideEventBus(StatsProvider statsProvider) { + Executor executor; if (async) { LinkedBlockingQueue executorQueue = new LinkedBlockingQueue<>(); - bind(new TypeLiteral>() { }) - .annotatedWith(PubsubExecutorQueue.class) - .toInstance(executorQueue); + statsProvider.makeGauge(PUBSUB_EXECUTOR_QUEUE_GAUGE, executorQueue::size); executor = AsyncUtil.loggingExecutor( MAX_ASYNC_EVENT_BUS_THREADS.get(), @@ -104,28 +104,38 @@ public final class PubsubEventModule extends AbstractModule { executorQueue, "AsyncTaskEvents-%d", log); - - LifecycleModule.bindStartupAction(binder(), RegisterGauges.class); } else { executor = MoreExecutors.sameThreadExecutor(); } - final EventBus eventBus = new AsyncEventBus("AsyncTaskEvents", executor); + final AtomicLong subscriberExceptions = statsProvider.makeCounter(EXCEPTIONS_STAT); + EventBus eventBus = new AsyncEventBus( + executor, + new SubscriberExceptionHandler() { + @Override + public void handleException(Throwable exception, SubscriberExceptionContext context) { + subscriberExceptions.incrementAndGet(); + log.log( + Level.SEVERE, + "Failed to dispatch event to " + context.getSubscriberMethod() + ": " + exception, + exception); + } + } + ); + eventBus.register(new DeadEventHandler()); - bind(EventBus.class).toInstance(eventBus); + return eventBus; + } - EventSink eventSink = new EventSink() { + @Provides + @Singleton + EventSink provideEventSink(EventBus eventBus) { + return new EventSink() { @Override public void post(PubsubEvent event) { eventBus.post(event); } }; - bind(EventSink.class).toInstance(eventSink); - - // Ensure at least an empty binding is present. - getSubscriberBinder(binder()); - // TODO(ksweeney): Would this be better as a scheduler active service? - SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterSubscribers.class); } private class DeadEventHandler { @@ -135,32 +145,6 @@ public final class PubsubEventModule extends AbstractModule { } } - static class RegisterGauges implements Command { - private final StatsProvider statsProvider; - private final LinkedBlockingQueue pubsubQueue; - - @Inject - RegisterGauges( - StatsProvider statsProvider, - @PubsubExecutorQueue LinkedBlockingQueue pubsubQueue) { - - this.statsProvider = requireNonNull(statsProvider); - this.pubsubQueue = requireNonNull(pubsubQueue); - } - - @Override - public void execute() throws RuntimeException { - statsProvider.makeGauge( - PUBSUB_EXECUTOR_QUEUE_GAUGE, - new Supplier() { - @Override - public Integer get() { - return pubsubQueue.size(); - } - }); - } - } - static class RegisterSubscribers extends AbstractIdleService { private final EventBus eventBus; private final Set subscribers; http://git-wip-us.apache.org/repos/asf/aurora/blob/b86b293e/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java index 0faee92..a295fe8 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java @@ -18,7 +18,6 @@ import java.util.concurrent.ScheduledExecutorService; import javax.inject.Singleton; -import com.google.common.collect.ImmutableMap; import com.google.common.eventbus.EventBus; import com.google.common.testing.TearDown; import com.google.inject.AbstractModule; @@ -139,7 +138,7 @@ public class KillRetryTest extends EasyMockTest { clock.advance(Amount.of(100L, Time.MILLISECONDS)); clock.advance(Amount.of(1000L, Time.MILLISECONDS)); clock.advance(Amount.of(10000L, Time.MILLISECONDS)); - assertEquals(ImmutableMap.of(KillRetry.RETRIES_COUNTER, 2L), statsProvider.getAllValues()); + assertEquals(2L, statsProvider.getLongValue(KillRetry.RETRIES_COUNTER)); } @Test @@ -153,6 +152,6 @@ public class KillRetryTest extends EasyMockTest { moveToKilling(taskId); clock.advance(Amount.of(100L, Time.MILLISECONDS)); - assertEquals(ImmutableMap.of(KillRetry.RETRIES_COUNTER, 0L), statsProvider.getAllValues()); + assertEquals(0L, statsProvider.getLongValue(KillRetry.RETRIES_COUNTER)); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/b86b293e/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java index 9afd7df..45adb2e 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java @@ -52,6 +52,7 @@ import org.apache.aurora.scheduler.storage.db.DbUtil; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.apache.mesos.Protos.TaskInfo; import org.easymock.Capture; import org.easymock.EasyMock; @@ -121,7 +122,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { bind(StateManager.class).toInstance(stateManager); bind(TaskAssigner.class).toInstance(assigner); bind(Clock.class).toInstance(createMock(Clock.class)); - bind(StatsProvider.class).toInstance(createMock(StatsProvider.class)); + bind(StatsProvider.class).toInstance(new FakeStatsProvider()); bind(Storage.class).toInstance(storageImpl); PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class); } http://git-wip-us.apache.org/repos/asf/aurora/blob/b86b293e/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java index c6295e5..4c189c1 100644 --- a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java @@ -13,25 +13,31 @@ */ package org.apache.aurora.scheduler.events; +import java.util.logging.Level; import java.util.logging.Logger; -import com.google.common.collect.ImmutableMap; import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Key; +import com.google.inject.Module; import com.twitter.common.application.StartupStage; import com.twitter.common.application.modules.LifecycleModule; import com.twitter.common.base.ExceptionalCommand; import com.twitter.common.stats.StatsProvider; import com.twitter.common.testing.easymock.EasyMockTest; +import org.apache.aurora.scheduler.SchedulerServicesModule; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.eq; import static org.junit.Assert.assertEquals; public class PubsubEventModuleTest extends EasyMockTest { @@ -63,15 +69,44 @@ public class PubsubEventModuleTest extends EasyMockTest { injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute(); assertEquals( - ImmutableMap.of(PubsubEventModule.PUBSUB_EXECUTOR_QUEUE_GAUGE, 0), - statsProvider.getAllValues() + 0L, + statsProvider.getLongValue(PubsubEventModule.PUBSUB_EXECUTOR_QUEUE_GAUGE) ); } - public Injector getInjector(boolean isAsync) { + @Test + public void testPubsubExceptionTracking() throws Exception { + Injector injector = getInjector( + false, + new AbstractModule() { + @Override + protected void configure() { + PubsubEventModule.bindSubscriber(binder(), ThrowingSubscriber.class); + } + }); + + logger.log(eq(Level.SEVERE), anyString(), EasyMock.anyObject()); + + control.replay(); + + injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute(); + assertEquals(0L, statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT)); + injector.getInstance(EventBus.class).post("hello"); + assertEquals(1L, statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT)); + } + + static class ThrowingSubscriber implements PubsubEvent.EventSubscriber { + @Subscribe + public void receiveString(String value) { + throw new UnsupportedOperationException(); + } + } + + public Injector getInjector(boolean isAsync, Module... additionalModules) { return Guice.createInjector( new LifecycleModule(), new PubsubEventModule(isAsync, logger), + new SchedulerServicesModule(), new AbstractModule() { @Override protected void configure() { @@ -81,6 +116,9 @@ public class PubsubEventModuleTest extends EasyMockTest { bind(StatsProvider.class).toInstance(statsProvider); PubsubEventModule.bindSchedulingFilterDelegate(binder()) .toInstance(createMock(SchedulingFilter.class)); + for (Module module : additionalModules) { + install(module); + } } }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/b86b293e/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java index cd85b80..ba2edd8 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableSet; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; +import com.twitter.common.stats.StatsProvider; import com.twitter.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.AssignedTask; @@ -39,6 +40,7 @@ import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.junit.Before; import org.junit.Test; @@ -76,6 +78,7 @@ public class MaintenanceControllerImplTest extends EasyMockTest { StateModule.bindMaintenanceController(binder()); bind(Storage.class).toInstance(storageUtil.storage); bind(StateManager.class).toInstance(stateManager); + bind(StatsProvider.class).toInstance(new FakeStatsProvider()); } }); maintenance = injector.getInstance(MaintenanceController.class);