aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject aurora git commit: Adding logging threadpool executor.
Date Fri, 01 May 2015 22:42:54 GMT
Repository: aurora
Updated Branches:
  refs/heads/master f3da5aea2 -> 5fc7baf5a


Adding logging threadpool executor.

Reviewed at https://reviews.apache.org/r/33456/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/5fc7baf5
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/5fc7baf5
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/5fc7baf5

Branch: refs/heads/master
Commit: 5fc7baf5aa4cf6a94247ed9a287be431a3f229c3
Parents: f3da5ae
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Fri May 1 15:37:31 2015 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Fri May 1 15:37:31 2015 -0700

----------------------------------------------------------------------
 .../apache/aurora/scheduler/base/AsyncUtil.java | 84 ++++++++++++++------
 .../scheduler/events/PubsubEventModule.java     | 14 +---
 .../aurora/scheduler/base/AsyncUtilTest.java    | 64 +++++++++------
 3 files changed, 105 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/5fc7baf5/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java b/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java
index f657e05..d6d1350 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java
@@ -13,9 +13,12 @@
  */
 package org.apache.aurora.scheduler.base;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -50,30 +53,12 @@ public final class AsyncUtil {
     return new ScheduledThreadPoolExecutor(
         poolSize,
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build()) {
-
-      @Override
-      protected void afterExecute(Runnable runnable, Throwable throwable) {
-        // See java.util.concurrent.ThreadPoolExecutor#afterExecute(Runnable, Throwable)
-        // for more details and an implementation example.
-        super.afterExecute(runnable, throwable);
-        if (throwable == null) {
-          if (runnable instanceof Future) {
-            try {
-              Future<?> future = (Future<?>) runnable;
-              if (future.isDone()) {
-                future.get();
-              }
-            } catch (InterruptedException ie) {
-              Thread.currentThread().interrupt();
-            } catch (ExecutionException ee) {
-              logger.log(Level.SEVERE, ee.toString(), ee);
-            }
+          @Override
+          protected void afterExecute(Runnable runnable, Throwable throwable) {
+            super.afterExecute(runnable, throwable);
+            evaluateResult(runnable, throwable, logger);
           }
-        } else {
-          logger.log(Level.SEVERE, throwable.toString(), throwable);
-        }
-      }
-    };
+        };
   }
 
   /**
@@ -89,4 +74,57 @@ public final class AsyncUtil {
 
     return loggingScheduledExecutor(1, nameFormat, logger);
   }
+
+  /**
+   * Creates a {@link ThreadPoolExecutor} that logs unhandled errors.
+   *
+   * @param corePoolSize see {@link ThreadPoolExecutor}.
+   * @param maxPoolSize see {@link ThreadPoolExecutor}.
+   * @param workQueue see {@link ThreadPoolExecutor}.
+   * @param nameFormat Thread naming format.
+   * @param logger Logger instance.
+   * @return instance of {@link ThreadPoolExecutor} enabled to log unhandled exceptions.
+   */
+  public static ThreadPoolExecutor loggingExecutor(
+      int corePoolSize,
+      int maxPoolSize,
+      BlockingQueue<Runnable> workQueue,
+      String nameFormat,
+      final Logger logger) {
+
+    return new ThreadPoolExecutor(
+        corePoolSize,
+        maxPoolSize,
+        0L,
+        TimeUnit.MILLISECONDS,
+        workQueue,
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build()) {
+          @Override
+          protected void afterExecute(Runnable runnable, Throwable throwable) {
+            super.afterExecute(runnable, throwable);
+            evaluateResult(runnable, throwable, logger);
+          }
+        };
+  }
+
+  private static void evaluateResult(Runnable runnable, Throwable throwable, Logger logger)
{
+    // See java.util.concurrent.ThreadPoolExecutor#afterExecute(Runnable, Throwable)
+    // for more details and an implementation example.
+    if (throwable == null) {
+      if (runnable instanceof Future) {
+        try {
+          Future<?> future = (Future<?>) runnable;
+          if (future.isDone()) {
+            future.get();
+          }
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        } catch (ExecutionException ee) {
+          logger.log(Level.SEVERE, ee.toString(), ee);
+        }
+      }
+    } else {
+      logger.log(Level.SEVERE, throwable.toString(), throwable);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5fc7baf5/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
index 3a4d40a..82d479e 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
@@ -18,8 +18,6 @@ import java.lang.annotation.Target;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
@@ -34,7 +32,6 @@ import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
 import com.google.inject.TypeLiteral;
@@ -48,6 +45,7 @@ import com.twitter.common.base.Command;
 import com.twitter.common.stats.StatsProvider;
 
 import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.NotifyingSchedulingFilter.NotifyDelegate;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
@@ -100,16 +98,12 @@ public final class PubsubEventModule extends AbstractModule {
           .annotatedWith(PubsubExecutorQueue.class)
           .toInstance(executorQueue);
 
-      executor = new ThreadPoolExecutor(
+      executor = AsyncUtil.loggingExecutor(
           MAX_ASYNC_EVENT_BUS_THREADS.get(),
           MAX_ASYNC_EVENT_BUS_THREADS.get(),
-          0L,
-          TimeUnit.MILLISECONDS,
           executorQueue,
-          new ThreadFactoryBuilder()
-              .setDaemon(true)
-              .setNameFormat("AsyncTaskEvents-%d")
-              .build());
+          "AsyncTaskEvents-%d",
+          log);
 
       LifecycleModule.bindStartupAction(binder(), RegisterGauges.class);
     } else {

http://git-wip-us.apache.org/repos/asf/aurora/blob/5fc7baf5/src/test/java/org/apache/aurora/scheduler/base/AsyncUtilTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/base/AsyncUtilTest.java b/src/test/java/org/apache/aurora/scheduler/base/AsyncUtilTest.java
index e990f52..2397186 100644
--- a/src/test/java/org/apache/aurora/scheduler/base/AsyncUtilTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/base/AsyncUtilTest.java
@@ -15,7 +15,9 @@ package org.apache.aurora.scheduler.base;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -32,46 +34,67 @@ import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expectLastCall;
 
 public class AsyncUtilTest extends EasyMockTest {
+  private static final String NAME_FORMAT = "Test-%d";
   private Logger logger;
-  private ScheduledThreadPoolExecutor executor;
   private CountDownLatch latch;
 
   @Before
   public void setUp() {
     logger = createMock(Logger.class);
     latch = new CountDownLatch(1);
-    executor = AsyncUtil.singleThreadLoggingScheduledExecutor("Test-%d", logger);
   }
 
   @Test
   public void testScheduleLogging() throws Exception {
-    logger.log(
-        eq(Level.SEVERE),
-        contains("Expected exception."),
-        EasyMock.<ExecutionException>anyObject());
+    expectLogging();
 
-    expectLastCall().andAnswer(new IAnswer<Object>() {
+    control.replay();
+
+    scheduledExecutor().schedule(new Runnable() {
       @Override
-      public Object answer() throws Throwable {
-        latch.countDown();
-        return null;
+      public void run() {
+        throw new IllegalArgumentException("Expected exception.");
       }
-    }).once();
+    }, 0, TimeUnit.MILLISECONDS);
+
+    latch.await();
+  }
+
+  @Test
+  public void testSubmitLogging() throws Exception {
+    expectLogging();
 
     control.replay();
 
-    executor.schedule(new Runnable() {
+    scheduledExecutor().submit(new Runnable() {
       @Override
       public void run() {
         throw new IllegalArgumentException("Expected exception.");
       }
-    }, 0, TimeUnit.MILLISECONDS);
+    });
 
     latch.await();
   }
 
   @Test
-  public void testSubmitLogging() throws Exception {
+  public void testExecuteLogging() throws Exception {
+    expectLogging();
+
+    control.replay();
+
+    ThreadPoolExecutor executor =
+        AsyncUtil.loggingExecutor(1, 1, new LinkedBlockingQueue<Runnable>(), NAME_FORMAT,
logger);
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        throw new IllegalArgumentException("Expected exception.");
+      }
+    });
+
+    latch.await();
+  }
+
+  private void expectLogging() {
     logger.log(
         eq(Level.SEVERE),
         contains("Expected exception."),
@@ -84,16 +107,9 @@ public class AsyncUtilTest extends EasyMockTest {
         return null;
       }
     }).once();
+  }
 
-    control.replay();
-
-    executor.submit(new Runnable() {
-      @Override
-      public void run() {
-        throw new IllegalArgumentException("Expected exception.");
-      }
-    });
-
-    latch.await();
+  private ScheduledThreadPoolExecutor scheduledExecutor() {
+    return AsyncUtil.singleThreadLoggingScheduledExecutor(NAME_FORMAT, logger);
   }
 }


Mime
View raw message