bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eolive...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-1083: Improvements on OrderedSafeExecutor
Date Thu, 01 Jun 2017 10:30:58 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 77a01dc7e -> e33ec10aa


BOOKKEEPER-1083: Improvements on OrderedSafeExecutor

    - use listeningscheduledexecutorservice for the threads
    - as a general util class, expose chooseThread to allow applications use specific thread
    - add a method to force shutdown executor

Author: Robin Dhamankar <rdhamankar@twitter.com>
Author: Sijie Guo <sijie@apache.org>

Reviewers: Enrico Olivelli

Closes #171 from sijie/add_monitored_thread_pool


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/e33ec10a
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/e33ec10a
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/e33ec10a

Branch: refs/heads/master
Commit: e33ec10aa400f32c2e0278c15ea80a0f624e5919
Parents: 77a01dc
Author: Robin Dhamankar <rdhamankar@twitter.com>
Authored: Thu Jun 1 12:30:54 2017 +0200
Committer: Enrico Olivelli <eolivelli@apache.org>
Committed: Thu Jun 1 12:30:54 2017 +0200

----------------------------------------------------------------------
 .../bookkeeper/util/OrderedSafeExecutor.java    | 108 ++++++++++++-------
 1 file changed, 70 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e33ec10a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
index 118001c..c3554b4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
@@ -18,10 +18,12 @@
 package org.apache.bookkeeper.util;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -29,7 +31,6 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -57,7 +58,7 @@ import org.slf4j.LoggerFactory;
 public class OrderedSafeExecutor {
     final static long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
     final String name;
-    final ScheduledThreadPoolExecutor threads[];
+    final ListeningScheduledExecutorService threads[];
     final long threadIds[];
     final Random rand = new Random();
     final OpStatsLogger taskExecutionStats;
@@ -170,16 +171,15 @@ public class OrderedSafeExecutor {
 
         this.warnTimeMicroSec = warnTimeMicroSec;
         name = baseName;
-        threads = new ScheduledThreadPoolExecutor[numThreads];
+        threads = new ListeningScheduledExecutorService[numThreads];
         threadIds = new long[numThreads];
         for (int i = 0; i < numThreads; i++) {
-            threads[i] =  new ScheduledThreadPoolExecutor(1,
+            final ScheduledThreadPoolExecutor thread =  new ScheduledThreadPoolExecutor(1,
                     new ThreadFactoryBuilder()
                         .setNameFormat(name + "-orderedsafeexecutor-" + i + "-%d")
                         .setThreadFactory(threadFactory)
                         .build());
-
-            // Save thread ids
+            threads[i] = MoreExecutors.listeningDecorator(thread);
             final int idx = i;
             try {
                 threads[idx].submit(new SafeRunnable() {
@@ -203,7 +203,7 @@ public class OrderedSafeExecutor {
 
                 @Override
                 public Number getSample() {
-                    return threads[idx].getQueue().size();
+                    return thread.getQueue().size();
                 }
             });
             statsLogger.registerGauge(String.format("%s-completed-tasks-%d", name, idx),
new Gauge<Number>() {
@@ -214,7 +214,7 @@ public class OrderedSafeExecutor {
 
                 @Override
                 public Number getSample() {
-                    return threads[idx].getCompletedTaskCount();
+                    return thread.getCompletedTaskCount();
                 }
             });
             statsLogger.registerGauge(String.format("%s-total-tasks-%d", name, idx), new
Gauge<Number>() {
@@ -225,7 +225,7 @@ public class OrderedSafeExecutor {
 
                 @Override
                 public Number getSample() {
-                    return threads[idx].getTaskCount();
+                    return thread.getTaskCount();
                 }
             });
         }
@@ -236,18 +236,16 @@ public class OrderedSafeExecutor {
         this.traceTaskExecution = traceTaskExecution;
     }
 
-    ScheduledExecutorService chooseThread() {
+    public ListeningScheduledExecutorService chooseThread() {
         // skip random # generation in this special case
         if (threads.length == 1) {
             return threads[0];
         }
 
         return threads[rand.nextInt(threads.length)];
-
     }
 
-    ScheduledExecutorService chooseThread(Object orderingKey) {
-        // skip hashcode generation in this special case
+    public ListeningScheduledExecutorService chooseThread(Object orderingKey) {
         if (threads.length == 1) {
             return threads[0];
         }
@@ -255,8 +253,13 @@ public class OrderedSafeExecutor {
         return threads[MathUtils.signSafeMod(orderingKey.hashCode(), threads.length)];
     }
 
-    ExecutorService chooseThread(long orderingKey) {
-        // skip hashcode generation in this special case
+    /**
+     * skip hashcode generation in this special case
+     *
+     * @param orderingKey long ordering key
+     * @return the thread for executing this order key
+     */
+    public ListeningScheduledExecutorService chooseThread(long orderingKey) {
         if (threads.length == 1) {
             return threads[0];
         }
@@ -276,7 +279,16 @@ public class OrderedSafeExecutor {
      * schedules a one time action to execute
      */
     public void submit(SafeRunnable r) {
-        chooseThread().execute(timedRunnable(r));
+        chooseThread().submit(timedRunnable(r));
+    }
+
+    /**
+     * schedules a one time action to execute with an ordering guarantee on the key
+     * @param orderingKey
+     * @param r
+     */
+    public ListenableFuture<?> submitOrdered(Object orderingKey, SafeRunnable r) {
+        return chooseThread(orderingKey).submit(timedRunnable(r));
     }
 
     /**
@@ -284,8 +296,28 @@ public class OrderedSafeExecutor {
      * @param orderingKey
      * @param r
      */
-    public void submitOrdered(Object orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(timedRunnable(r));
+    public void submitOrdered(long orderingKey, SafeRunnable r) {
+        chooseThread(orderingKey).execute(r);
+    }
+
+    /**
+     * schedules a one time action to execute with an ordering guarantee on the key
+     * @param orderingKey
+     * @param r
+     */
+    public void submitOrdered(int orderingKey, SafeRunnable r) {
+        chooseThread(orderingKey).execute(r);
+    }
+
+    /**
+     * schedules a one time action to execute with an ordering guarantee on the key.
+     *
+     * @param orderingKey
+     * @param callable
+     */
+    public <T> ListenableFuture<T> submitOrdered(Object orderingKey,
+                                                 java.util.concurrent.Callable<T> callable)
{
+        return chooseThread(orderingKey).submit(callable);
     }
 
     /**
@@ -386,24 +418,6 @@ public class OrderedSafeExecutor {
         return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay,
unit);
     }
 
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the key
-     * @param orderingKey
-     * @param r
-     */
-    public void submitOrdered(long orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(r);
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the key
-     * @param orderingKey
-     * @param r
-     */
-    public void submitOrdered(int orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(r);
-    }
-
     private long getThreadID(long orderingKey) {
         // skip hashcode generation in this special case
         if (threadIds.length == 1) {
@@ -428,12 +442,30 @@ public class OrderedSafeExecutor {
     }
 
     /**
+     * Force threads shutdown (cancel active requests) after specified delay,
+     * to be used after shutdown() rejects new requests.
+     */
+    public void forceShutdown(long timeout, TimeUnit unit) {
+        for (int i = 0; i < threads.length; i++) {
+            try {
+                if (!threads[i].awaitTermination(timeout, unit)) {
+                    threads[i].shutdownNow();
+                }
+            }
+            catch (InterruptedException exception) {
+                threads[i].shutdownNow();
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
      * Generic callback implementation which will run the
      * callback in the thread which matches the ordering key
      */
     public static abstract class OrderedSafeGenericCallback<T>
             implements GenericCallback<T> {
-        private final Logger LOG = LoggerFactory.getLogger(OrderedSafeGenericCallback.class);
+        private static final Logger LOG = LoggerFactory.getLogger(OrderedSafeGenericCallback.class);
 
         private final OrderedSafeExecutor executor;
         private final long orderingKey;


Mime
View raw message