bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eolive...@apache.org
Subject [bookkeeper] branch master updated: ISSUE #527: Introduce backoff and retry utilities
Date Tue, 03 Oct 2017 10:46:20 GMT
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 8399707  ISSUE #527: Introduce backoff and retry utilities
8399707 is described below

commit 8399707f054e5d387e1af18384efab4cd1b26e57
Author: Jia Zhai <zhaijia@apache.org>
AuthorDate: Tue Oct 3 12:46:14 2017 +0200

    ISSUE #527: Introduce backoff and retry utilities
    
    Descriptions of the changes in this PR:
    Mainly want to  Introduce backoff and retires utilities:
    Backoff implements various backoff strategiesis, and id intended to determine the duration after which a task is to be retried.
    Retries is a util class for supporting retries with customized backoff.
    
    Author: Jia Zhai <zhaijia@apache.org>
    
    Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <sijie@apache.org>
    
    This closes #528 from zhaijack/retries_backoff, closes #527
---
 bookkeeper-common/pom.xml                          |  11 +
 .../common/concurrent/FutureEventListener.java     |  44 +++
 .../bookkeeper/common/concurrent/FutureUtils.java  | 368 ++++++++++++++++++++
 .../bookkeeper/common/concurrent/package-info.java |  22 ++
 .../bookkeeper/common/stats/OpStatsListener.java   |  55 +++
 .../bookkeeper/common/stats/package-info.java      |  21 ++
 .../org/apache/bookkeeper/common/util/Backoff.java | 288 +++++++++++++++
 .../apache/bookkeeper/common}/util/MathUtils.java  |  23 +-
 .../bookkeeper/common/util/OrderedScheduler.java   | 193 +++++-----
 .../org/apache/bookkeeper/common/util/Retries.java | 142 ++++++++
 .../bookkeeper/common}/util/SafeRunnable.java      |  56 +--
 .../apache/bookkeeper/common/util/StreamUtil.java  |  70 ++++
 .../common/concurrent/TestFutureUtils.java         | 387 +++++++++++++++++++++
 .../apache/bookkeeper/common/util/TestBackoff.java | 191 ++++++++++
 .../bookkeeper/common/util/TestMathUtils.java      |  61 ++++
 bookkeeper-server/pom.xml                          |  15 +-
 .../main/java/org/apache/bookkeeper/util/Main.java |  54 ---
 .../java/org/apache/bookkeeper/util/MathUtils.java |  84 +----
 .../bookkeeper/util/OrderedSafeExecutor.java       | 271 +--------------
 .../org/apache/bookkeeper/util/SafeRunnable.java   |  23 +-
 .../main/resources/bookkeeper/findbugsExclude.xml  |  21 ++
 pom.xml                                            |   2 +
 22 files changed, 1829 insertions(+), 573 deletions(-)

diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml
index 4961e4a..b369630 100644
--- a/bookkeeper-common/pom.xml
+++ b/bookkeeper-common/pom.xml
@@ -36,6 +36,17 @@
       <artifactId>guava</artifactId>
       <version>${guava.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+      <version>${google.code.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons-lang3.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureEventListener.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureEventListener.java
new file mode 100644
index 0000000..6b71800
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureEventListener.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.concurrent;
+
+import java.util.concurrent.CompletionException;
+import java.util.function.BiConsumer;
+
+/**
+ * Provide similar interface (as twitter future) over java future.
+ */
+public interface FutureEventListener<T> extends BiConsumer<T, Throwable> {
+
+  void onSuccess(T value);
+
+  void onFailure(Throwable cause);
+
+  @Override
+  default void accept(T t, Throwable throwable) {
+    if (null != throwable) {
+      if (throwable instanceof CompletionException && null != throwable.getCause()) {
+        onFailure(throwable.getCause());
+      } else {
+        onFailure(throwable);
+      }
+      return;
+    }
+    onSuccess(t);
+  }
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureUtils.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureUtils.java
new file mode 100644
index 0000000..ab2d1ca
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureUtils.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.concurrent;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.stats.OpStatsListener;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+/**
+ * Future related utils.
+ */
+@Slf4j
+public final class FutureUtils {
+
+    private FutureUtils() {}
+
+    private static final Function<Throwable, Exception> DEFAULT_EXCEPTION_HANDLER = cause -> {
+        if (cause instanceof Exception) {
+            return (Exception) cause;
+        } else {
+            return new Exception(cause);
+        }
+    };
+
+    public static CompletableFuture<Void> Void() {
+        return value(null);
+    }
+
+    public static <T> T result(CompletableFuture<T> future) throws Exception {
+        return FutureUtils.result(future, DEFAULT_EXCEPTION_HANDLER);
+    }
+
+    public static <T> T result(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) throws Exception {
+        return FutureUtils.result(future, DEFAULT_EXCEPTION_HANDLER, timeout, timeUnit);
+    }
+
+    @SneakyThrows(InterruptedException.class)
+    public static <T, ExceptionT extends Throwable> T result(
+        CompletableFuture<T> future, Function<Throwable, ExceptionT> exceptionHandler) throws ExceptionT {
+        try {
+            return future.get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw e;
+        } catch (ExecutionException e) {
+            ExceptionT cause = exceptionHandler.apply(e.getCause());
+            if (null == cause) {
+                return null;
+            } else {
+                throw cause;
+            }
+        }
+    }
+
+    @SneakyThrows(InterruptedException.class)
+    public static <T, ExceptionT extends Throwable> T result(
+        CompletableFuture<T> future,
+        Function<Throwable, ExceptionT> exceptionHandler,
+        long timeout,
+        TimeUnit timeUnit) throws ExceptionT, TimeoutException {
+        try {
+            return future.get(timeout, timeUnit);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw e;
+        } catch (ExecutionException e) {
+            ExceptionT cause = exceptionHandler.apply(e.getCause());
+            if (null == cause) {
+                return null;
+            } else {
+                throw cause;
+            }
+        }
+    }
+
+    public static <T> CompletableFuture<T> createFuture() {
+        return new CompletableFuture<T>();
+    }
+
+    public static <T> CompletableFuture<T> value(T value) {
+        return CompletableFuture.completedFuture(value);
+    }
+
+    public static <T> CompletableFuture<T> exception(Throwable cause) {
+        CompletableFuture<T> future = FutureUtils.createFuture();
+        future.completeExceptionally(cause);
+        return future;
+    }
+
+    public static <T> void complete(CompletableFuture<T> result,
+                                    T value) {
+        if (null == result) {
+            return;
+        }
+        result.complete(value);
+    }
+
+    public static <T> void completeExceptionally(CompletableFuture<T> result,
+                                                 Throwable cause) {
+        if (null == result) {
+            return;
+        }
+        result.completeExceptionally(cause);
+    }
+
+    /**
+     * Completing the {@code future} in the thread in the scheduler identified by
+     * the {@code scheduleKey}.
+     *
+     * @param future      future to complete
+     * @param action      action to execute when complete
+     * @param scheduler   scheduler to execute the action.
+     * @param scheduleKey key to choose the thread to execute the action
+     * @param <T>
+     * @return
+     */
+    public static <T> CompletableFuture<T> whenCompleteAsync(
+        CompletableFuture<T> future,
+        BiConsumer<? super T, ? super Throwable> action,
+        OrderedScheduler scheduler,
+        Object scheduleKey) {
+        return future.whenCompleteAsync(action, scheduler.chooseThread(scheduleKey));
+    }
+
+    public static <T> CompletableFuture<List<T>> collect(List<CompletableFuture<T>> futureList) {
+        CompletableFuture<Void> finalFuture =
+            CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
+        return finalFuture.thenApply(result ->
+            futureList
+                .stream()
+                .map(CompletableFuture::join)
+                .collect(Collectors.toList()));
+    }
+
+    public static <T> void proxyTo(CompletableFuture<T> src,
+                                   CompletableFuture<T> target) {
+        src.whenComplete((value, cause) -> {
+            if (null == cause) {
+                target.complete(value);
+            } else {
+                target.completeExceptionally(cause);
+            }
+        });
+    }
+
+    //
+    // Process futures
+    //
+
+    private static class ListFutureProcessor<T, R>
+        implements FutureEventListener<R>, Runnable {
+
+        private volatile boolean done = false;
+        private final Iterator<T> itemsIter;
+        private final Function<T, CompletableFuture<R>> processFunc;
+        private final CompletableFuture<List<R>> promise;
+        private final List<R> results;
+        private final ExecutorService callbackExecutor;
+
+        ListFutureProcessor(List<T> items,
+                            Function<T, CompletableFuture<R>> processFunc,
+                            ExecutorService callbackExecutor) {
+            this.itemsIter = items.iterator();
+            this.processFunc = processFunc;
+            this.promise = new CompletableFuture<>();
+            this.results = Lists.newArrayListWithExpectedSize(items.size());
+            this.callbackExecutor = callbackExecutor;
+        }
+
+        @Override
+        public void onSuccess(R value) {
+            results.add(value);
+            if (null == callbackExecutor) {
+                run();
+            } else {
+                callbackExecutor.submit(this);
+            }
+        }
+
+        @Override
+        public void onFailure(final Throwable cause) {
+            done = true;
+
+            if (null == callbackExecutor) {
+                promise.completeExceptionally(cause);
+            } else {
+                callbackExecutor.submit((Runnable) () -> promise.completeExceptionally(cause));
+            }
+        }
+
+        @Override
+        public void run() {
+            if (done) {
+                log.debug("ListFutureProcessor is interrupted.");
+                return;
+            }
+            if (!itemsIter.hasNext()) {
+                promise.complete(results);
+                done = true;
+                return;
+            }
+            processFunc.apply(itemsIter.next()).whenComplete(this);
+        }
+    }
+
+    /**
+     * Process the list of items one by one using the process function <i>processFunc</i>.
+     * The process will be stopped immediately if it fails on processing any one.
+     *
+     * @param collection       list of items
+     * @param processFunc      process function
+     * @param callbackExecutor executor to process the item
+     * @return future presents the list of processed results
+     */
+    public static <T, R> CompletableFuture<List<R>> processList(List<T> collection,
+                                                                Function<T, CompletableFuture<R>> processFunc,
+                                                                @Nullable ExecutorService callbackExecutor) {
+        ListFutureProcessor<T, R> processor =
+            new ListFutureProcessor<T, R>(collection, processFunc, callbackExecutor);
+        if (null != callbackExecutor) {
+            callbackExecutor.submit(processor);
+        } else {
+            processor.run();
+        }
+        return processor.promise;
+    }
+
+    /**
+     * Raise an exception to the <i>promise</i> within a given <i>timeout</i> period.
+     * If the promise has been satisfied before raising, it won't change the state of the promise.
+     *
+     * @param promise   promise to raise exception
+     * @param timeout   timeout period
+     * @param unit      timeout period unit
+     * @param cause     cause to raise
+     * @param scheduler scheduler to execute raising exception
+     * @param key       the submit key used by the scheduler
+     * @return the promise applied with the raise logic
+     */
+    public static <T> CompletableFuture<T> within(final CompletableFuture<T> promise,
+                                                  final long timeout,
+                                                  final TimeUnit unit,
+                                                  final Throwable cause,
+                                                  final OrderedScheduler scheduler,
+                                                  final Object key) {
+        if (timeout < 0 || promise.isDone()) {
+            return promise;
+        }
+        // schedule a timeout to raise timeout exception
+        final java.util.concurrent.ScheduledFuture<?> task = scheduler.scheduleOrdered(key, () -> {
+            if (!promise.isDone() && promise.completeExceptionally(cause)) {
+                log.info("Raise exception", cause);
+            }
+        }, timeout, unit);
+        // when the promise is satisfied, cancel the timeout task
+        promise.whenComplete((value, throwable) -> {
+                if (!task.cancel(true)) {
+                    log.debug("Failed to cancel the timeout task");
+                }
+            }
+        );
+        return promise;
+    }
+
+    /**
+     * Ignore exception from the <i>future</i>.
+     *
+     * @param future the original future
+     * @return a transformed future ignores exceptions
+     */
+    public static <T> CompletableFuture<Void> ignore(CompletableFuture<T> future) {
+        return ignore(future, null);
+    }
+
+    /**
+     * Ignore exception from the <i>future</i> and log <i>errorMsg</i> on exceptions.
+     *
+     * @param future   the original future
+     * @param errorMsg the error message to log on exceptions
+     * @return a transformed future ignores exceptions
+     */
+    public static <T> CompletableFuture<Void> ignore(CompletableFuture<T> future,
+                                                     final String errorMsg) {
+        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
+        future.whenComplete(new FutureEventListener<T>() {
+            @Override
+            public void onSuccess(T value) {
+                promise.complete(null);
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                if (null != errorMsg) {
+                    log.error(errorMsg, cause);
+                }
+                promise.complete(null);
+            }
+        });
+        return promise;
+    }
+
+    public static <T> CompletableFuture<T> ensure(CompletableFuture<T> future,
+                                                  Runnable ensureBlock) {
+        return future.whenComplete((value, cause) -> {
+            ensureBlock.run();
+        });
+    }
+
+    public static <T> CompletableFuture<T> rescue(CompletableFuture<T> future,
+                                                  Function<Throwable, CompletableFuture<T>> rescueFuc) {
+        CompletableFuture<T> result = FutureUtils.createFuture();
+        future.whenComplete((value, cause) -> {
+            if (null == cause) {
+                result.complete(value);
+                return;
+            }
+            proxyTo(rescueFuc.apply(cause), result);
+        });
+        return result;
+    }
+
+    /**
+      * Add a event listener over <i>result</i> for collecting the operation stats.
+      *
+      * @param result result to listen on
+      * @param opStatsLogger stats logger to record operations stats
+      * @param stopwatch stop watch to time operation
+      * @param <T>
+      * @return result after registered the event listener
+      */
+    public static <T> CompletableFuture<T> stats(CompletableFuture<T> result,
+                                                 OpStatsLogger opStatsLogger,
+                                                 Stopwatch stopwatch) {
+        return result.whenComplete(new OpStatsListener<T>(opStatsLogger, stopwatch));
+    }
+
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/package-info.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/package-info.java
new file mode 100644
index 0000000..7f4d098
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility classes commonly useful in concurrent programming.
+ */
+package org.apache.bookkeeper.common.concurrent;
\ No newline at end of file
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/OpStatsListener.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/OpStatsListener.java
new file mode 100644
index 0000000..ca6eb74
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/OpStatsListener.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.stats;
+
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+/**
+ * A {@link FutureEventListener} monitors the stats for a given operation.
+ */
+public class OpStatsListener<T> implements FutureEventListener<T> {
+
+    private final OpStatsLogger opStatsLogger;
+    private final Stopwatch stopwatch;
+
+    public OpStatsListener(OpStatsLogger opStatsLogger, Stopwatch stopwatch) {
+        this.opStatsLogger = opStatsLogger;
+        if (null == stopwatch) {
+            this.stopwatch = Stopwatch.createStarted();
+        } else {
+            this.stopwatch = stopwatch;
+        }
+    }
+
+    public OpStatsListener(OpStatsLogger opStatsLogger) {
+        this(opStatsLogger, null);
+    }
+
+    @Override
+    public void onSuccess(T value) {
+        opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
+    }
+
+    @Override
+    public void onFailure(Throwable cause) {
+        opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
+    }
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/package-info.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/package-info.java
new file mode 100644
index 0000000..f211381
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Stats Related Utils.
+ */
+package org.apache.bookkeeper.common.stats;
\ No newline at end of file
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java
new file mode 100644
index 0000000..085e4cd
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import lombok.Data;
+import lombok.ToString;
+
+/**
+ * Implements various backoff strategies.
+ *
+ * <p>Strategies are defined by a {@link java.util.stream.Stream} of durations
+ * and are intended to determine the duration after which a task is to be
+ * retried.
+ */
+public class Backoff {
+
+    private static final int MaxBitShift = 62;
+
+    /**
+     * Back off policy.
+     *
+     * <p>It defines a stream of time durations that will be used for backing off.
+     */
+    public interface Policy {
+
+        Policy NONE = () -> Stream.empty();
+
+        /**
+         * Convert the policy into a series of backoff values.
+         *
+         * @return a series of backoff values.
+         */
+        Stream<Long> toBackoffs();
+
+    }
+
+    /**
+     * A constant backoff policy.
+     */
+    @Data(staticConstructor = "of")
+    @ToString
+    public static class Constant implements Policy {
+
+        /**
+         * Create infinite constant backoff stream.
+         *
+         * <p>It is the infinite version of {@link #of(long, long)}.
+         *
+         * @param ms constant backoff time in milliseconds.
+         * @return constant backoff policy.
+         */
+        public static Constant of(long ms) {
+            return of(ms, -1);
+        }
+
+        private final long ms;
+        private final long limit;
+
+        @Override
+        public Stream<Long> toBackoffs() {
+            if (limit >= 0) {
+                return constant(ms).limit(limit);
+            } else {
+                return constant(ms);
+            }
+        }
+    }
+
+    /**
+     * A Jittered backoff policy.
+     *
+     * <p>It is an implementation of {@link http://www.awsarchitectureblog.com/2015/03/backoff.html}
+     */
+    @Data(staticConstructor = "of")
+    @ToString
+    public static class Jitter implements Policy {
+
+        enum Type {
+            DECORRELATED,
+            EQUAL,
+            EXPONENTIAL
+        }
+
+        /**
+         * Create infinite jittered backoff stream.
+         *
+         * <p>It is the infinite version of {@link #of(Type, long, long, long)}.
+         *
+         * @param type    jittered backoff type
+         * @param startMs the start backoff time in milliseconds.
+         * @param maxMs   the max backoff time in milliseconds.
+         * @return jittered backoff policy.
+         */
+        public static Jitter of(Type type, long startMs, long maxMs) {
+            return of(type, startMs, maxMs, -1);
+        }
+
+        private final Type type;
+        private final long startMs;
+        private final long maxMs;
+        private final long limit;
+
+        @Override
+        public Stream<Long> toBackoffs() {
+            Stream<Long> backoffStream;
+            switch (type) {
+                case DECORRELATED:
+                    backoffStream = decorrelatedJittered(startMs, maxMs);
+                    break;
+                case EQUAL:
+                    backoffStream = equalJittered(startMs, maxMs);
+                    break;
+                case EXPONENTIAL:
+                default:
+                    backoffStream = exponentialJittered(startMs, maxMs);
+                    break;
+            }
+            if (limit >= 0) {
+                return backoffStream.limit(limit);
+            } else {
+                return backoffStream;
+            }
+        }
+    }
+
+    /**
+     * A exponential backoff policy.
+     */
+    @Data(staticConstructor = "of")
+    @ToString
+    public static class Exponential implements Policy {
+
+        /**
+         * Create an infinite exponential backoff policy.
+         *
+         * <p>It is the infinite version of {@link #of(long, long, int, int)}.
+         *
+         * @param startMs    start backoff time in milliseconds.
+         * @param maxMs      max backoff time in milliseconds.
+         * @param multiplier the backoff multiplier
+         * @return the exponential backoff policy.
+         */
+        public static Exponential of(long startMs, long maxMs, int multiplier) {
+            return of(startMs, maxMs, multiplier, -1);
+        }
+
+        private final long startMs;
+        private final long maxMs;
+        private final int multiplier;
+        private final int limit;
+
+        @Override
+        public Stream<Long> toBackoffs() {
+            if (limit >= 0) {
+                return exponential(startMs, multiplier, maxMs).limit(limit);
+            } else {
+                return exponential(startMs, multiplier, maxMs);
+            }
+        }
+    }
+
+    /**
+     * Create a stream with constant backoffs.
+     *
+     * @param startMs initial backoff in milliseconds
+     * @return a stream with constant backoff values.
+     */
+    public static Stream<Long> constant(long startMs) {
+        return Stream.iterate(startMs, lastMs -> startMs);
+    }
+
+    /**
+     * Create a stream with exponential backoffs.
+     *
+     * @param startMs    initial backoff in milliseconds.
+     * @param multiplier the multiplier for next backoff.
+     * @param maxMs      max backoff in milliseconds.
+     * @return a stream with exponential backoffs.
+     */
+    public static Stream<Long> exponential(long startMs,
+                                           int multiplier,
+                                           long maxMs) {
+        return Stream.iterate(startMs, lastMs -> Math.min(lastMs * multiplier, maxMs));
+    }
+
+    /**
+     * Create a stream of exponential backoffs with jitters.
+     *
+     * <p>This is "full jitter" via http://www.awsarchitectureblog.com/2015/03/backoff.html
+     *
+     * @param startMs initial backoff in milliseconds.
+     * @param maxMs   max backoff in milliseconds.
+     * @return a stream of exponential backoffs with jitters.
+     */
+    public static Stream<Long> exponentialJittered(long startMs,
+                                                   long maxMs) {
+        final long startNanos = TimeUnit.NANOSECONDS.convert(startMs, TimeUnit.MILLISECONDS);
+        final long maxNanos = TimeUnit.NANOSECONDS.convert(maxMs, TimeUnit.MILLISECONDS);
+        final AtomicLong attempts = new AtomicLong(1);
+        return Stream.iterate(startMs, lastMs -> {
+            long shift = Math.min(attempts.get(), MaxBitShift);
+            long maxBackoffNanos = Math.min(maxNanos, startNanos * (1L << shift));
+            long randomMs = TimeUnit.MILLISECONDS.convert(
+                ThreadLocalRandom.current().nextLong(startNanos, maxBackoffNanos),
+                TimeUnit.NANOSECONDS);
+            attempts.incrementAndGet();
+            return randomMs;
+        });
+    }
+
+    /**
+     * Create an infinite backoffs that have jitter with a random distribution
+     * between {@code startMs} and 3 times the previously selected value, capped at {@code maxMs}.
+     *
+     * <p>this is "decorrelated jitter" via http://www.awsarchitectureblog.com/2015/03/backoff.html
+     *
+     * @param startMs initial backoff in milliseconds
+     * @param maxMs   max backoff in milliseconds
+     * @return a stream of jitter backoffs.
+     */
+    public static Stream<Long> decorrelatedJittered(long startMs,
+                                                    long maxMs) {
+        final long startNanos = TimeUnit.NANOSECONDS.convert(startMs, TimeUnit.MILLISECONDS);
+        final long maxNanos = TimeUnit.NANOSECONDS.convert(maxMs, TimeUnit.MILLISECONDS);
+        return Stream.iterate(startMs, lastMs -> {
+            long lastNanos = TimeUnit.MILLISECONDS.convert(lastMs, TimeUnit.NANOSECONDS);
+            long randRange = Math.abs(lastNanos * 3 - startNanos);
+            long randBackoff;
+            if (0L == randRange) {
+                randBackoff = startNanos;
+            } else {
+                randBackoff = startNanos + ThreadLocalRandom.current().nextLong(randRange);
+            }
+            long backOffNanos = Math.min(maxNanos, randBackoff);
+            return TimeUnit.MILLISECONDS.convert(backOffNanos, TimeUnit.NANOSECONDS);
+        });
+
+    }
+
+    /**
+     * Create infinite backoffs that keep half of the exponential growth, and jitter
+     * between 0 and that amount.
+     *
+     * <p>this is "equal jitter" via http://www.awsarchitectureblog.com/2015/03/backoff.html
+     *
+     * @param startMs initial backoff in milliseconds.
+     * @param maxMs   max backoff in milliseconds.
+     * @return a stream of exponential backoffs with jitters.
+     */
+    public static Stream<Long> equalJittered(long startMs,
+                                             long maxMs) {
+        final long startNanos = TimeUnit.NANOSECONDS.convert(startMs, TimeUnit.MILLISECONDS);
+        final long maxNanos = TimeUnit.NANOSECONDS.convert(maxMs, TimeUnit.MILLISECONDS);
+        final AtomicLong attempts = new AtomicLong(1);
+        return Stream.iterate(startMs, lastMs -> {
+            long shift = Math.min(attempts.get() - 1, MaxBitShift);
+            long halfExpNanos = startNanos * (1L << shift);
+            long backoffNanos = halfExpNanos + ThreadLocalRandom.current().nextLong(halfExpNanos);
+            attempts.incrementAndGet();
+            if (backoffNanos < maxNanos) {
+                return TimeUnit.MILLISECONDS.convert(backoffNanos, TimeUnit.NANOSECONDS);
+            } else {
+                return maxMs;
+            }
+        });
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/MathUtils.java
similarity index 91%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/MathUtils.java
index 1b3044d..94999a4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/MathUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,15 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.util;
+package org.apache.bookkeeper.common.util;
 
 import java.util.concurrent.TimeUnit;
 
 /**
- * Provides misc math functions that don't come standard
+ * Provides misc math functions that don't come standard.
  */
 public class MathUtils {
+
     private static final long NANOSECONDS_PER_MILLISECOND = 1000000;
+
     public static int signSafeMod(long dividend, int divisor) {
         int mod = (int) (dividend % divisor);
 
@@ -32,7 +34,6 @@ public class MathUtils {
         }
 
         return mod;
-
     }
 
     public static int findNextPositivePowerOfTwo(final int value) {
@@ -45,7 +46,7 @@ public class MathUtils {
      * changes. This is appropriate to use when computing how much longer to
      * wait for an interval to expire.
      *
-     * NOTE: only use it for measuring.
+     * <p>NOTE: only use it for measuring.
      * http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
      *
      * @return current time in milliseconds.
@@ -60,7 +61,7 @@ public class MathUtils {
      * changes. This is appropriate to use when computing how much longer to
      * wait for an interval to expire.
      *
-     * NOTE: only use it for measuring.
+     * <p>NOTE: only use it for measuring.
      * http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
      *
      * @return current time in nanoseconds.
@@ -71,18 +72,18 @@ public class MathUtils {
 
     /**
      * Milliseconds elapsed since the time specified, the input is nanoTime
-     * the only conversion happens when computing the elapsed time
+     * the only conversion happens when computing the elapsed time.
      *
      * @param startNanoTime the start of the interval that we are measuring
      * @return elapsed time in milliseconds.
      */
-    public static long elapsedMSec (long startNanoTime) {
-       return (System.nanoTime() - startNanoTime)/ NANOSECONDS_PER_MILLISECOND;
+    public static long elapsedMSec(long startNanoTime) {
+       return (System.nanoTime() - startNanoTime) / NANOSECONDS_PER_MILLISECOND;
     }
 
     /**
      * Microseconds elapsed since the time specified, the input is nanoTime
-     * the only conversion happens when computing the elapsed time
+     * the only conversion happens when computing the elapsed time.
      *
      * @param startNanoTime the start of the interval that we are measuring
      * @return elapsed time in milliseconds.
@@ -93,7 +94,7 @@ public class MathUtils {
 
     /**
      * Nanoseconds elapsed since the time specified, the input is nanoTime
-     * the only conversion happens when computing the elapsed time
+     * the only conversion happens when computing the elapsed time.
      *
      * @param startNanoTime the start of the interval that we are measuring
      * @return elapsed time in milliseconds.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
similarity index 73%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
index 76f0830..41a7fa0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,9 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.util;
+package org.apache.bookkeeper.common.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -25,38 +26,35 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This class provides 2 things over the java {@link ScheduledExecutorService}.
  *
- * 1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
+ * <p>1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
  * This means that exceptions in scheduled tasks wont go unnoticed and will be
  * logged.
  *
- * 2. It supports submitting tasks with an ordering key, so that tasks submitted
+ * <p>2. It supports submitting tasks with an ordering key, so that tasks submitted
  * with the same key will always be executed in order, but tasks across
  * different keys can be unordered. This retains parallelism while retaining the
  * basic amount of ordering we want (e.g. , per ledger handle). Ordering is
  * achieved by hashing the key objects to threads by their {@link #hashCode()}
  * method.
- *
  */
-public class OrderedSafeExecutor {
-    final static long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
+public class OrderedScheduler {
+
+    protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
+
     final String name;
     final ListeningScheduledExecutorService threads[];
     final long threadIds[];
@@ -66,59 +64,78 @@ public class OrderedSafeExecutor {
     final boolean traceTaskExecution;
     final long warnTimeMicroSec;
 
-    public static Builder newBuilder() {
-        return new Builder();
+    /**
+     * Create a builder to build ordered scheduler.
+     *
+     * @return builder to build ordered scheduler.
+     */
+    public static SchedulerBuilder newSchedulerBuilder() {
+        return new SchedulerBuilder();
     }
 
-    public static class Builder {
-        private String name = "OrderedSafeExecutor";
-        private int numThreads = Runtime.getRuntime().availableProcessors();
-        private ThreadFactory threadFactory = null;
-        private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
-        private boolean traceTaskExecution = false;
-        private long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
+    /**
+     * Builder to build ordered scheduler.
+     */
+    public static class SchedulerBuilder extends AbstractBuilder<OrderedScheduler> {}
 
-        public Builder name(String name) {
+    /**
+     * Abstract builder class to build {@link OrderedScheduler}.
+     */
+    public abstract static class AbstractBuilder<T extends OrderedScheduler> {
+        protected String name = getClass().getSimpleName();
+        protected int numThreads = Runtime.getRuntime().availableProcessors();
+        protected ThreadFactory threadFactory = null;
+        protected StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+        protected boolean traceTaskExecution = false;
+        protected long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
+
+        public AbstractBuilder<T> name(String name) {
             this.name = name;
             return this;
         }
 
-        public Builder numThreads(int num) {
+        public AbstractBuilder<T> numThreads(int num) {
             this.numThreads = num;
             return this;
         }
 
-        public Builder threadFactory(ThreadFactory threadFactory) {
+        public AbstractBuilder<T> threadFactory(ThreadFactory threadFactory) {
             this.threadFactory = threadFactory;
             return this;
         }
 
-        public Builder statsLogger(StatsLogger statsLogger) {
+        public AbstractBuilder<T> statsLogger(StatsLogger statsLogger) {
             this.statsLogger = statsLogger;
             return this;
         }
 
-        public Builder traceTaskExecution(boolean enabled) {
+        public AbstractBuilder<T> traceTaskExecution(boolean enabled) {
             this.traceTaskExecution = enabled;
             return this;
         }
 
-        public Builder traceTaskWarnTimeMicroSec(long warnTimeMicroSec) {
+        public AbstractBuilder<T> traceTaskWarnTimeMicroSec(long warnTimeMicroSec) {
             this.warnTimeMicroSec = warnTimeMicroSec;
             return this;
         }
 
-        public OrderedSafeExecutor build() {
+        @SuppressWarnings("unchecked")
+        public T build() {
             if (null == threadFactory) {
                 threadFactory = Executors.defaultThreadFactory();
             }
-            return new OrderedSafeExecutor(name, numThreads, threadFactory, statsLogger,
-                                           traceTaskExecution, warnTimeMicroSec);
+            return (T) new OrderedScheduler(
+                name,
+                numThreads,
+                threadFactory,
+                statsLogger,
+                traceTaskExecution,
+                warnTimeMicroSec);
         }
 
     }
 
-    private class TimedRunnable extends SafeRunnable {
+    private class TimedRunnable implements SafeRunnable {
         final SafeRunnable runnable;
         final long initNanos;
 
@@ -135,20 +152,14 @@ public class OrderedSafeExecutor {
             long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos);
             taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
             if (elapsedMicroSec >= warnTimeMicroSec) {
-                logger.warn("Runnable {}:{} took too long {} micros to execute.",
+                LOGGER.warn("Runnable {}:{} took too long {} micros to execute.",
                             new Object[] { runnable, runnable.getClass(), elapsedMicroSec });
             }
         }
-     }
-
-    @Deprecated
-    public OrderedSafeExecutor(int numThreads, String threadName) {
-        this(threadName, numThreads, Executors.defaultThreadFactory(), NullStatsLogger.INSTANCE,
-             false, WARN_TIME_MICRO_SEC_DEFAULT);
     }
 
     /**
-     * Constructs Safe executor
+     * Constructs Safe executor.
      *
      * @param numThreads
      *            - number of threads
@@ -163,11 +174,14 @@ public class OrderedSafeExecutor {
      * @param warnTimeMicroSec
      *            - log long task exec warning after this interval
      */
-    private OrderedSafeExecutor(String baseName, int numThreads, ThreadFactory threadFactory,
-                                StatsLogger statsLogger, boolean traceTaskExecution,
-                                long warnTimeMicroSec) {
-        Preconditions.checkArgument(numThreads > 0);
-        Preconditions.checkArgument(!StringUtils.isBlank(baseName));
+    protected OrderedScheduler(String baseName,
+                               int numThreads,
+                               ThreadFactory threadFactory,
+                               StatsLogger statsLogger,
+                               boolean traceTaskExecution,
+                               long warnTimeMicroSec) {
+        checkArgument(numThreads > 0);
+        checkArgument(!StringUtils.isBlank(baseName));
 
         this.warnTimeMicroSec = warnTimeMicroSec;
         name = baseName;
@@ -176,7 +190,7 @@ public class OrderedSafeExecutor {
         for (int i = 0; i < numThreads; i++) {
             final ScheduledThreadPoolExecutor thread =  new ScheduledThreadPoolExecutor(1,
                     new ThreadFactoryBuilder()
-                        .setNameFormat(name + "-orderedsafeexecutor-" + i + "-%d")
+                        .setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
                         .setThreadFactory(threadFactory)
                         .build());
             threads[i] = MoreExecutors.listeningDecorator(thread);
@@ -255,7 +269,7 @@ public class OrderedSafeExecutor {
     }
 
     /**
-     * skip hashcode generation in this special case
+     * skip hashcode generation in this special case.
      *
      * @param orderingKey long ordering key
      * @return the thread for executing this order key
@@ -277,14 +291,15 @@ public class OrderedSafeExecutor {
     }
 
     /**
-     * schedules a one time action to execute
+     * schedules a one time action to execute.
      */
     public void submit(SafeRunnable r) {
         chooseThread().submit(timedRunnable(r));
     }
 
     /**
-     * schedules a one time action to execute with an ordering guarantee on the key
+     * schedules a one time action to execute with an ordering guarantee on the key.
+     *
      * @param orderingKey
      * @param r
      */
@@ -293,7 +308,8 @@ public class OrderedSafeExecutor {
     }
 
     /**
-     * schedules a one time action to execute with an ordering guarantee on the key
+     * schedules a one time action to execute with an ordering guarantee on the key.
+     *
      * @param orderingKey
      * @param r
      */
@@ -302,7 +318,8 @@ public class OrderedSafeExecutor {
     }
 
     /**
-     * schedules a one time action to execute with an ordering guarantee on the key
+     * schedules a one time action to execute with an ordering guarantee on the key.
+     *
      * @param orderingKey
      * @param r
      */
@@ -327,7 +344,8 @@ public class OrderedSafeExecutor {
      * @param command - the SafeRunnable to execute
      * @param delay - the time from now to delay execution
      * @param unit - the time unit of the delay parameter
-     * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion
+     * @return a ScheduledFuture representing pending completion of the task and whose get() method
+     *         will return null upon completion
      */
     public ScheduledFuture<?> schedule(SafeRunnable command, long delay, TimeUnit unit) {
         return chooseThread().schedule(command, delay, unit);
@@ -340,7 +358,8 @@ public class OrderedSafeExecutor {
      * @param command - the SafeRunnable to execute
      * @param delay - the time from now to delay execution
      * @param unit - the time unit of the delay parameter
-     * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion
+     * @return a ScheduledFuture representing pending completion of the task and whose get() method
+     *         will return null upon completion
      */
     public ScheduledFuture<?> scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) {
         return chooseThread(orderingKey).schedule(command, delay, unit);
@@ -348,9 +367,9 @@ public class OrderedSafeExecutor {
 
     /**
      * Creates and executes a periodic action that becomes enabled first after
-     * the given initial delay, and subsequently with the given period;
+     * the given initial delay, and subsequently with the given period.
      *
-     * For more details check scheduleAtFixedRate in interface ScheduledExecutorService
+     * <p>For more details check {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
      *
      * @param command - the SafeRunnable to execute
      * @param initialDelay - the time to delay first execution
@@ -365,9 +384,9 @@ public class OrderedSafeExecutor {
 
     /**
      * Creates and executes a periodic action that becomes enabled first after
-     * the given initial delay, and subsequently with the given period;
+     * the given initial delay, and subsequently with the given period.
      *
-     * For more details check scheduleAtFixedRate in interface ScheduledExecutorService
+     * <p>For more details check {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
      *
      * @param orderingKey - the key used for ordering
      * @param command - the SafeRunnable to execute
@@ -386,7 +405,8 @@ public class OrderedSafeExecutor {
      * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
      * with the given delay between the termination of one execution and the commencement of the next.
      *
-     * For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
+     * <p>For more details check {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}
+     * .
      *
      * @param command - the SafeRunnable to execute
      * @param initialDelay - the time to delay first execution
@@ -404,7 +424,8 @@ public class OrderedSafeExecutor {
      * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
      * with the given delay between the termination of one execution and the commencement of the next.
      *
-     * For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
+     * <p>For more details check {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}
+     * .
      *
      * @param orderingKey - the key used for ordering
      * @param command - the SafeRunnable to execute
@@ -419,7 +440,7 @@ public class OrderedSafeExecutor {
         return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
     }
 
-    private long getThreadID(long orderingKey) {
+    protected long getThreadID(long orderingKey) {
         // skip hashcode generation in this special case
         if (threadIds.length == 1) {
             return threadIds[0];
@@ -452,63 +473,11 @@ public class OrderedSafeExecutor {
                 if (!threads[i].awaitTermination(timeout, unit)) {
                     threads[i].shutdownNow();
                 }
-            }
-            catch (InterruptedException exception) {
+            } catch (InterruptedException exception) {
                 threads[i].shutdownNow();
                 Thread.currentThread().interrupt();
             }
         }
     }
 
-    /**
-     * Generic callback implementation which will run the
-     * callback in the thread which matches the ordering key
-     */
-    public static abstract class OrderedSafeGenericCallback<T>
-            implements GenericCallback<T> {
-        private static final Logger LOG = LoggerFactory.getLogger(OrderedSafeGenericCallback.class);
-
-        private final OrderedSafeExecutor executor;
-        private final long orderingKey;
-
-        /**
-         * @param executor The executor on which to run the callback
-         * @param orderingKey Key used to decide which thread the callback
-         *                    should run on.
-         */
-        public OrderedSafeGenericCallback(OrderedSafeExecutor executor, long orderingKey) {
-            this.executor = executor;
-            this.orderingKey = orderingKey;
-        }
-
-        @Override
-        public final void operationComplete(final int rc, final T result) {
-            // during closing, callbacks that are error out might try to submit to
-            // the scheduler again. if the submission will go to same thread, we
-            // don't need to submit to executor again. this is also an optimization for
-            // callback submission
-            if (Thread.currentThread().getId() == executor.getThreadID(orderingKey)) {
-                safeOperationComplete(rc, result);
-            } else {
-                try {
-                    executor.submitOrdered(orderingKey, new SafeRunnable() {
-                        @Override
-                        public void safeRun() {
-                            safeOperationComplete(rc, result);
-                        }
-                        @Override
-                        public String toString() {
-                            return String.format("Callback(key=%s, name=%s)",
-                                                 orderingKey,
-                                                 OrderedSafeGenericCallback.this);
-                        }
-                    });
-                } catch (RejectedExecutionException re) {
-                    LOG.warn("Failed to submit callback for {} : ", orderingKey, re);
-                }
-            }
-        }
-
-        public abstract void safeOperationComplete(int rc, T result);
-    }
 }
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java
new file mode 100644
index 0000000..3dd4da8
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+
+/**
+ * A util class for supporting retries with customized backoff.
+ */
+public final class Retries {
+
+    private Retries() {
+    }
+
+    public static final Predicate<Throwable> NonFatalPredicate =
+        cause -> !(cause instanceof RuntimeException);
+
+    /**
+     * Retry a given {@code task} on failures.
+     *
+     * <p>It is a shortcut of {@link #run(Stream, Predicate, Supplier, OrderedScheduler, Object)}
+     * that runs retries on any threads in the provided {@code scheduler}.
+     *
+     * @param backoffs       a stream of backoff delays, in milliseconds.
+     * @param retryPredicate a predicate to test if failures are retryable.
+     * @param task           a task to execute.
+     * @param scheduler      scheduler to schedule the task and complete the futures.
+     * @param <ReturnT>      the return type
+     * @return future represents the result of the task with retries.
+     */
+    public static <ReturnT> CompletableFuture<ReturnT> run(
+        Stream<Long> backoffs,
+        Predicate<Throwable> retryPredicate,
+        Supplier<CompletableFuture<ReturnT>> task,
+        OrderedScheduler scheduler) {
+        return run(backoffs, retryPredicate, task, scheduler, null);
+    }
+
+    /**
+     * Retry a given {@code task} on failures.
+     *
+     * <p>It will only retry the tasks when the predicate {@code retryPredicate} tests
+     * it as a retryable failure and it doesn't exhaust the retry budget. The retry delays
+     * are defined in a stream of delay values (in milliseconds).
+     *
+     * <p>If a schedule {@code key} is provided, the {@code task} will be submitted to the
+     * scheduler using the provided schedule {@code key} and also the returned future
+     * will be completed in the same thread. Otherwise, the task and the returned future will
+     * be executed and scheduled on any threads in the scheduler.
+     *
+     * @param backoffs       a stream of backoff delays, in milliseconds.
+     * @param retryPredicate a predicate to test if failures are retryable.
+     * @param task           a task to execute.
+     * @param scheduler      scheduler to schedule the task and complete the futures.
+     * @param key            the submit key for the scheduler.
+     * @param <ReturnT>      the return tye.
+     * @return future represents the result of the task with retries.
+     */
+    public static <ReturnT> CompletableFuture<ReturnT> run(
+        Stream<Long> backoffs,
+        Predicate<Throwable> retryPredicate,
+        Supplier<CompletableFuture<ReturnT>> task,
+        OrderedScheduler scheduler,
+        Object key) {
+        CompletableFuture<ReturnT> future = FutureUtils.createFuture();
+        if (null == key) {
+            execute(
+                future,
+                backoffs.iterator(),
+                retryPredicate,
+                task,
+                scheduler,
+                null);
+        } else {
+            scheduler.submitOrdered(key, () -> execute(
+                future,
+                backoffs.iterator(),
+                retryPredicate,
+                task,
+                scheduler,
+                key));
+        }
+        return future;
+    }
+
+    private static <ReturnT> void execute(
+        CompletableFuture<ReturnT> finalResult,
+        Iterator<Long> backoffIter,
+        Predicate<Throwable> retryPredicate,
+        Supplier<CompletableFuture<ReturnT>> task,
+        OrderedScheduler scheduler,
+        Object key) {
+
+        FutureUtils.whenCompleteAsync(task.get(), (result, cause) -> {
+            if (null == cause) {
+                finalResult.complete(result);
+                return;
+            }
+            if (retryPredicate.test(cause)) {
+                if (!backoffIter.hasNext()) {
+                    // exhausts all the retry budgets, fail the task now
+                    finalResult.completeExceptionally(cause);
+                    return;
+                }
+                long nextRetryDelayMs = backoffIter.next();
+                scheduler.scheduleOrdered(key, () -> execute(
+                    finalResult,
+                    backoffIter,
+                    retryPredicate,
+                    task,
+                    scheduler,
+                    key), nextRetryDelayMs, TimeUnit.MILLISECONDS);
+            } else {
+                // the exception can not be retried
+                finalResult.completeExceptionally(cause);
+            }
+        }, scheduler, key);
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SafeRunnable.java
similarity index 66%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SafeRunnable.java
index 8b1e0d0..6a3cf4d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SafeRunnable.java
@@ -1,8 +1,4 @@
-package org.apache.bookkeeper.util;
-
-import java.util.function.Consumer;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,28 +16,35 @@ import java.util.function.Consumer;
  * limitations under the License.
  */
 
+package org.apache.bookkeeper.common.util;
+
+import java.util.function.Consumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class SafeRunnable implements Runnable {
+/**
+ * A runnable that catches runtime exceptions.
+ */
+@FunctionalInterface
+public interface SafeRunnable extends Runnable {
 
-    static final Logger logger = LoggerFactory.getLogger(SafeRunnable.class);
+    Logger LOGGER = LoggerFactory.getLogger(SafeRunnable.class);
 
     @Override
-    public void run() {
+    default void run() {
         try {
             safeRun();
-        } catch(Throwable t) {
-            logger.error("Unexpected throwable caught ", t);
+        } catch (Throwable t) {
+            LOGGER.error("Unexpected throwable caught ", t);
         }
     }
 
-    public abstract void safeRun();
+    void safeRun();
 
     /**
-     * Utility method to use SafeRunnable from lambdas
-     * <p>
-     * Eg:
+     * Utility method to use SafeRunnable from lambdas.
+     *
+     * <p>Eg:
      * <pre>
      * <code>
      * executor.submit(SafeRunnable.safeRun(() -> {
@@ -50,7 +53,7 @@ public abstract class SafeRunnable implements Runnable {
      * </code>
      * </pre>
      */
-    public static SafeRunnable safeRun(Runnable runnable) {
+    static SafeRunnable safeRun(Runnable runnable) {
         return new SafeRunnable() {
             @Override
             public void safeRun() {
@@ -61,9 +64,9 @@ public abstract class SafeRunnable implements Runnable {
 
     /**
      * Utility method to use SafeRunnable from lambdas with
-     * a custom exception handler
-     * <p>
-     * Eg:
+     * a custom exception handler.
+     *
+     * <p>Eg:
      * <pre>
      * <code>
      * executor.submit(SafeRunnable.safeRun(() -> {
@@ -79,16 +82,13 @@ public abstract class SafeRunnable implements Runnable {
      *            handler that will be called when there are any exception
      * @return
      */
-    public static SafeRunnable safeRun(Runnable runnable, Consumer<Throwable> exceptionHandler) {
-        return new SafeRunnable() {
-            @Override
-            public void safeRun() {
-                try {
-                    runnable.run();
-                } catch (Throwable t) {
-                    exceptionHandler.accept(t);
-                    throw t;
-                }
+    static SafeRunnable safeRun(Runnable runnable, Consumer<Throwable> exceptionHandler) {
+        return () -> {
+            try {
+                runnable.run();
+            } catch (Throwable t) {
+                exceptionHandler.accept(t);
+                throw t;
             }
         };
     }
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/StreamUtil.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/StreamUtil.java
new file mode 100644
index 0000000..28b510a
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/StreamUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.function.BiFunction;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * Java {@link java.util.stream.Stream} related operations.
+ */
+public class StreamUtil {
+
+    public static <T1, T2, R> Stream<R> zip(Stream<? extends T1> a,
+                                            Stream<? extends T2> b,
+                                            BiFunction<? super T1, ? super T2, ? extends R> zipper) {
+        Objects.requireNonNull(zipper);
+        Spliterator<? extends T1> aSpliterator = Objects.requireNonNull(a).spliterator();
+        Spliterator<? extends T2> bSpliterator = Objects.requireNonNull(b).spliterator();
+
+        // Zipping looses DISTINCT and SORTED characteristics
+        int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics()
+            & ~(Spliterator.DISTINCT | Spliterator.SORTED);
+
+        long zipSize = ((characteristics & Spliterator.SIZED) != 0)
+            ? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown())
+            : -1;
+
+        Iterator<T1> aIterator = Spliterators.iterator(aSpliterator);
+        Iterator<T2> bIterator = Spliterators.iterator(bSpliterator);
+        Iterator<R> cIterator = new Iterator<R>() {
+            @Override
+            public boolean hasNext() {
+                return aIterator.hasNext() && bIterator.hasNext();
+            }
+
+            @Override
+            public R next() {
+                return zipper.apply(aIterator.next(), bIterator.next());
+            }
+        };
+
+        Spliterator<R> split = Spliterators.spliterator(cIterator, zipSize, characteristics);
+        return (a.isParallel() || b.isParallel())
+            ? StreamSupport.stream(split, true)
+            : StreamSupport.stream(split, false);
+    }
+
+
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/concurrent/TestFutureUtils.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/concurrent/TestFutureUtils.java
new file mode 100644
index 0000000..fe11e5a
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/concurrent/TestFutureUtils.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.concurrent;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.LongStream;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.junit.Test;
+
+/**
+ * Unit Test for {@link FutureUtils}.
+ */
+public class TestFutureUtils {
+
+    /**
+     * Test Exception.
+     */
+    static class TestException extends IOException {
+        private static final long serialVersionUID = -6256482498453846308L;
+
+        public TestException() {
+            super("test-exception");
+        }
+    }
+
+    @Test
+    public void testComplete() throws Exception {
+        CompletableFuture<Long> future = FutureUtils.createFuture();
+        FutureUtils.complete(future, 1024L);
+        assertEquals(1024L, FutureUtils.result(future).longValue());
+    }
+
+    @Test(expected = TestException.class)
+    public void testCompleteExceptionally() throws Exception {
+        CompletableFuture<Long> future = FutureUtils.createFuture();
+        FutureUtils.completeExceptionally(future, new TestException());
+        FutureUtils.result(future);
+    }
+
+    @Test
+    public void testWhenCompleteAsync() throws Exception {
+        OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder()
+            .name("test-when-complete-async")
+            .numThreads(1)
+            .build();
+        AtomicLong resultHolder = new AtomicLong(0L);
+        CountDownLatch latch = new CountDownLatch(1);
+        CompletableFuture<Long> future = FutureUtils.createFuture();
+        FutureUtils.whenCompleteAsync(
+            future,
+            (result, cause) -> {
+                resultHolder.set(result);
+                latch.countDown();
+            },
+            scheduler,
+            new Object());
+        FutureUtils.complete(future, 1234L);
+        latch.await();
+        assertEquals(1234L, resultHolder.get());
+    }
+
+    @Test
+    public void testProxyToSuccess() throws Exception {
+        CompletableFuture<Long> src = FutureUtils.createFuture();
+        CompletableFuture<Long> target = FutureUtils.createFuture();
+        FutureUtils.proxyTo(src, target);
+        FutureUtils.complete(src, 10L);
+        assertEquals(10L, FutureUtils.result(target).longValue());
+    }
+
+    @Test(expected = TestException.class)
+    public void testProxyToFailure() throws Exception {
+        CompletableFuture<Long> src = FutureUtils.createFuture();
+        CompletableFuture<Long> target = FutureUtils.createFuture();
+        FutureUtils.proxyTo(src, target);
+        FutureUtils.completeExceptionally(src, new TestException());
+        FutureUtils.result(target);
+    }
+
+    @Test
+    public void testVoid() throws Exception {
+        CompletableFuture<Void> voidFuture = FutureUtils.Void();
+        assertTrue(voidFuture.isDone());
+        assertFalse(voidFuture.isCompletedExceptionally());
+        assertFalse(voidFuture.isCancelled());
+    }
+
+    @Test
+    public void testCollectEmptyList() throws Exception {
+        List<CompletableFuture<Integer>> futures = Lists.newArrayList();
+        List<Integer> result = FutureUtils.result(FutureUtils.collect(futures));
+        assertTrue(result.isEmpty());
+    }
+
+    @Test
+    public void testCollectTenItems() throws Exception {
+        List<CompletableFuture<Integer>> futures = Lists.newArrayList();
+        List<Integer> expectedResults = Lists.newArrayList();
+        for (int i = 0; i < 10; i++) {
+            futures.add(FutureUtils.value(i));
+            expectedResults.add(i);
+        }
+        List<Integer> results = FutureUtils.result(FutureUtils.collect(futures));
+        assertEquals(expectedResults, results);
+    }
+
+    @Test(expected = TestException.class)
+    public void testCollectFailures() throws Exception {
+        List<CompletableFuture<Integer>> futures = Lists.newArrayList();
+        List<Integer> expectedResults = Lists.newArrayList();
+        for (int i = 0; i < 10; i++) {
+            if (i == 9) {
+                futures.add(FutureUtils.value(i));
+            } else {
+                futures.add(FutureUtils.exception(new TestException()));
+            }
+            expectedResults.add(i);
+        }
+        FutureUtils.result(FutureUtils.collect(futures));
+    }
+
+    @Test
+    public void testWithinAlreadyDone() throws Exception {
+        OrderedScheduler scheduler = mock(OrderedScheduler.class);
+        CompletableFuture<Long> doneFuture = FutureUtils.value(1234L);
+        CompletableFuture<Long> withinFuture = FutureUtils.within(
+            doneFuture,
+            10,
+            TimeUnit.MILLISECONDS,
+            new TestException(),
+            scheduler,
+            1234L);
+        TimeUnit.MILLISECONDS.sleep(20);
+        assertTrue(withinFuture.isDone());
+        assertFalse(withinFuture.isCancelled());
+        assertFalse(withinFuture.isCompletedExceptionally());
+        verify(scheduler, times(0))
+            .scheduleOrdered(eq(1234L), isA(SafeRunnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testWithinZeroTimeout() throws Exception {
+        OrderedScheduler scheduler = mock(OrderedScheduler.class);
+        CompletableFuture<Long> newFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> withinFuture = FutureUtils.within(
+            newFuture,
+            0,
+            TimeUnit.MILLISECONDS,
+            new TestException(),
+            scheduler,
+            1234L);
+        TimeUnit.MILLISECONDS.sleep(20);
+        assertFalse(withinFuture.isDone());
+        assertFalse(withinFuture.isCancelled());
+        assertFalse(withinFuture.isCompletedExceptionally());
+        verify(scheduler, times(0))
+            .scheduleOrdered(eq(1234L), isA(SafeRunnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testWithinCompleteBeforeTimeout() throws Exception {
+        OrderedScheduler scheduler = mock(OrderedScheduler.class);
+        ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
+        when(scheduler.scheduleOrdered(any(Object.class), any(SafeRunnable.class), anyLong(), any(TimeUnit.class)))
+            .thenAnswer(invocationOnMock -> scheduledFuture);
+        CompletableFuture<Long> newFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> withinFuture = FutureUtils.within(
+            newFuture,
+            Long.MAX_VALUE,
+            TimeUnit.MILLISECONDS,
+            new TestException(),
+            scheduler,
+            1234L);
+        assertFalse(withinFuture.isDone());
+        assertFalse(withinFuture.isCancelled());
+        assertFalse(withinFuture.isCompletedExceptionally());
+
+        newFuture.complete(5678L);
+
+        assertTrue(withinFuture.isDone());
+        assertFalse(withinFuture.isCancelled());
+        assertFalse(withinFuture.isCompletedExceptionally());
+        assertEquals((Long) 5678L, FutureUtils.result(withinFuture));
+
+        verify(scheduledFuture, times(1))
+            .cancel(eq(true));
+    }
+
+    @Test
+    public void testIgnoreSuccess() {
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture);
+        underlyFuture.complete(1234L);
+        assertTrue(ignoredFuture.isDone());
+        assertFalse(ignoredFuture.isCompletedExceptionally());
+        assertFalse(ignoredFuture.isCancelled());
+    }
+
+    @Test
+    public void testIgnoreFailure() {
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture);
+        underlyFuture.completeExceptionally(new TestException());
+        assertTrue(ignoredFuture.isDone());
+        assertFalse(ignoredFuture.isCompletedExceptionally());
+        assertFalse(ignoredFuture.isCancelled());
+    }
+
+    @Test
+    public void testEnsureSuccess() throws Exception {
+        CountDownLatch ensureLatch = new CountDownLatch(1);
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> {
+            ensureLatch.countDown();
+        });
+        underlyFuture.complete(1234L);
+        FutureUtils.result(ensuredFuture);
+        assertTrue(ensuredFuture.isDone());
+        assertFalse(ensuredFuture.isCompletedExceptionally());
+        assertFalse(ensuredFuture.isCancelled());
+        ensureLatch.await();
+    }
+
+    @Test
+    public void testEnsureFailure() throws Exception {
+        CountDownLatch ensureLatch = new CountDownLatch(1);
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> {
+            ensureLatch.countDown();
+        });
+        underlyFuture.completeExceptionally(new TestException());
+        FutureUtils.result(FutureUtils.ignore(ensuredFuture));
+        assertTrue(ensuredFuture.isDone());
+        assertTrue(ensuredFuture.isCompletedExceptionally());
+        assertFalse(ensuredFuture.isCancelled());
+        ensureLatch.await();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testRescueSuccess() throws Exception {
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        Function<Throwable, CompletableFuture<Long>> rescueFuc = mock(Function.class);
+        CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, rescueFuc);
+        underlyFuture.complete(1234L);
+        FutureUtils.result(rescuedFuture);
+        assertTrue(rescuedFuture.isDone());
+        assertFalse(rescuedFuture.isCompletedExceptionally());
+        assertFalse(rescuedFuture.isCancelled());
+        verify(rescueFuc, times(0)).apply(any(Throwable.class));
+    }
+
+    @Test
+    public void testRescueFailure() throws Exception {
+        CompletableFuture<Long> futureCompletedAtRescue = FutureUtils.value(3456L);
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, (cause) -> futureCompletedAtRescue);
+        underlyFuture.completeExceptionally(new TestException());
+        FutureUtils.result(rescuedFuture);
+        assertTrue(rescuedFuture.isDone());
+        assertFalse(rescuedFuture.isCompletedExceptionally());
+        assertFalse(rescuedFuture.isCancelled());
+        assertEquals((Long) 3456L, FutureUtils.result(rescuedFuture));
+    }
+
+    @Test
+    public void testStatsSuccess() throws Exception {
+        OpStatsLogger statsLogger = mock(OpStatsLogger.class);
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> statsFuture = FutureUtils.stats(
+            underlyFuture,
+            statsLogger,
+            Stopwatch.createStarted());
+        underlyFuture.complete(1234L);
+        FutureUtils.result(statsFuture);
+        verify(statsLogger, times(1))
+            .registerSuccessfulEvent(anyLong(), eq(TimeUnit.MICROSECONDS));
+    }
+
+    @Test
+    public void testStatsFailure() throws Exception {
+        OpStatsLogger statsLogger = mock(OpStatsLogger.class);
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> statsFuture = FutureUtils.stats(
+            underlyFuture,
+            statsLogger,
+            Stopwatch.createStarted());
+        underlyFuture.completeExceptionally(new TestException());
+        FutureUtils.result(FutureUtils.ignore(statsFuture));
+        verify(statsLogger, times(1))
+            .registerFailedEvent(anyLong(), eq(TimeUnit.MICROSECONDS));
+    }
+
+    @Test
+    public void testProcessListSuccess() throws Exception {
+        List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator());
+        List<Long> expectedList = Lists.transform(
+            longList,
+            aLong -> 2 * aLong);
+        Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value);
+        CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
+            longList,
+            sumFunc,
+            null);
+        assertEquals(expectedList, FutureUtils.result(totalFuture));
+    }
+
+    @Test
+    public void testProcessEmptyList() throws Exception {
+        List<Long> longList = Lists.newArrayList();
+        List<Long> expectedList = Lists.transform(
+            longList,
+            aLong -> 2 * aLong);
+        Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value);
+        CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
+            longList,
+            sumFunc,
+            null);
+        assertEquals(expectedList, FutureUtils.result(totalFuture));
+    }
+
+    @Test
+    public void testProcessListFailures() throws Exception {
+        List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator());
+        AtomicLong total = new AtomicLong(0L);
+        Function<Long, CompletableFuture<Long>> sumFunc = value -> {
+            if (value < 5) {
+                total.addAndGet(value);
+                return FutureUtils.value(2 * value);
+            } else {
+                return FutureUtils.exception(new TestException());
+            }
+        };
+        CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
+            longList,
+            sumFunc,
+            null);
+        try {
+            FutureUtils.result(totalFuture);
+            fail("Should fail with TestException");
+        } catch (TestException te) {
+            // as expected
+        }
+        assertEquals(10L, total.get());
+    }
+
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestBackoff.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestBackoff.java
new file mode 100644
index 0000000..b35fcc0
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestBackoff.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static org.apache.bookkeeper.common.util.Backoff.Jitter.Type.DECORRELATED;
+import static org.apache.bookkeeper.common.util.Backoff.Jitter.Type.EQUAL;
+import static org.apache.bookkeeper.common.util.Backoff.Jitter.Type.EXPONENTIAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link Backoff}.
+ */
+public class TestBackoff {
+
+    static <T> void assertStreamEquals(Stream<T> s1, Stream<T> s2) {
+        Iterator<T> iter1 = s1.iterator(), iter2 = s2.iterator();
+        while (iter1.hasNext() && iter2.hasNext()) {
+            T expectedValue = iter1.next();
+            T actualValue = iter2.next();
+            assertEquals("Expected = " + expectedValue + ", Actual = " + actualValue,
+                expectedValue, actualValue);
+        }
+        assertTrue(!iter1.hasNext() && !iter2.hasNext());
+    }
+
+    @Test
+    public void testExponential() throws Exception {
+        Stream<Long> backoffs = Backoff.exponential(1000, 2, Long.MAX_VALUE).limit(10);
+        Stream<Long> expectedBackoffs = LongStream.range(0L, 10L).mapToObj(i -> (1000L << i));
+        assertStreamEquals(expectedBackoffs, backoffs);
+    }
+
+    @Test
+    public void testExponentialPolicy() throws Exception {
+        Stream<Long> expectedBackoffs = LongStream.range(0L, 10L).mapToObj(i -> (1000L << i));
+        Backoff.Policy policy = Backoff.Exponential.of(1000, Long.MAX_VALUE, 2, 10);
+        assertStreamEquals(expectedBackoffs, policy.toBackoffs());
+    }
+
+    @Test
+    public void testExponentialWithUpperLimit() throws Exception {
+        Stream<Long> backoffs = Backoff.exponential(1000, 2, 32000).limit(10);
+        Stream<Long> expectedBackoffs = LongStream.range(0L, 10L).mapToObj(i -> Math.min(1000L << i, 32000));
+        assertStreamEquals(expectedBackoffs, backoffs);
+    }
+
+    @Test
+    public void testExponentialPolicyWithUpperLimit() throws Exception {
+        Stream<Long> expectedBackoffs = LongStream.range(0L, 10L).mapToObj(i -> Math.min(1000L << i, 32000));
+        Backoff.Policy policy = Backoff.Exponential.of(1000, 32000, 2, 10);
+        assertStreamEquals(expectedBackoffs, policy.toBackoffs());
+    }
+
+    @Test
+    public void testExponentialJittered() throws Exception {
+        Stream<Long> backoffs = Backoff.exponentialJittered(5, 120).limit(10);
+        // Expected: 5, then randos up to: 10, 20, 40, 80, 120, 120, 120 ...
+        Stream<Long> maxBackoffs = Stream.of(5L, 10L, 20L, 40L, 80L, 120L, 120L, 120L, 120L, 120L);
+        StreamUtil.<Long, Long, Void>zip(backoffs, maxBackoffs, (expected, actual) -> {
+            assertTrue(expected <= actual);
+            return null;
+        });
+    }
+
+    @Test
+    public void testExponentialJitteredPolicy() throws Exception {
+        Stream<Long> backoffs = Backoff.Jitter.of(EXPONENTIAL, 5, 120, 10).toBackoffs();
+        // Expected: 5, then randos up to: 10, 20, 40, 80, 120, 120, 120 ...
+        Stream<Long> maxBackoffs = Stream.of(5L, 10L, 20L, 40L, 80L, 120L, 120L, 120L, 120L, 120L);
+        StreamUtil.<Long, Long, Void>zip(backoffs, maxBackoffs, (expected, actual) -> {
+            assertTrue(expected <= actual);
+            return null;
+        });
+    }
+
+    @Test
+    public void testConstant() throws Exception {
+        Stream<Long> backoffs = Backoff.constant(12345L).limit(10);
+        Stream<Long> expectedBackoffs = LongStream.range(0L, 10L).mapToObj(i -> 12345L);
+        assertStreamEquals(expectedBackoffs, backoffs);
+    }
+
+    @Test
+    public void testConstantPolicy() throws Exception {
+        Stream<Long> backoffs = Backoff.Constant.of(12345L, 10).toBackoffs();
+        Stream<Long> expectedBackoffs = LongStream.range(0L, 10L).mapToObj(i -> 12345L);
+        assertStreamEquals(expectedBackoffs, backoffs);
+    }
+
+    @Test
+    public void testEqualJittered() throws Exception {
+        Stream<Long> backoffs = Backoff.equalJittered(5, 120).limit(10);
+        Stream<Pair<Long, Long>> ranges = Stream.of(
+            Pair.of(5L, 10L),
+            Pair.of(10L, 20L),
+            Pair.of(20L, 40L),
+            Pair.of(40L, 80L),
+            Pair.of(80L, 120L),
+            Pair.of(80L, 120L),
+            Pair.of(80L, 120L),
+            Pair.of(80L, 120L),
+            Pair.of(80L, 120L)
+        );
+        StreamUtil.<Long, Pair<Long, Long>, Void>zip(backoffs, ranges, (backoff, maxPair) -> {
+            assertTrue(backoff >= maxPair.getLeft());
+            assertTrue(backoff <= maxPair.getRight());
+            return null;
+        });
+    }
+
+    @Test
+    public void testEqualJitteredPolicy() throws Exception {
+        Stream<Long> backoffs = Backoff.Jitter.of(EQUAL, 5, 120, 10).toBackoffs();
+        Stream<Pair<Long, Long>> ranges = Stream.of(
+            Pair.of(5L, 10L),
+            Pair.of(10L, 20L),
+            Pair.of(20L, 40L),
+            Pair.of(40L, 80L),
+            Pair.of(80L, 120L),
+            Pair.of(80L, 120L),
+            Pair.of(80L, 120L),
+            Pair.of(80L, 120L),
+            Pair.of(80L, 120L)
+        );
+        StreamUtil.<Long, Pair<Long, Long>, Void>zip(backoffs, ranges, (backoff, maxPair) -> {
+            assertTrue(backoff >= maxPair.getLeft());
+            assertTrue(backoff <= maxPair.getRight());
+            return null;
+        });
+    }
+
+    @Test
+    public void testDecorrelatedJittered() throws Exception {
+        long startMs = ThreadLocalRandom.current().nextLong(1L, 1000L);
+        long maxMs = ThreadLocalRandom.current().nextLong(startMs, startMs * 2);
+        Stream<Long> backoffs = Backoff.decorrelatedJittered(startMs, maxMs).limit(10);
+        Iterator<Long> backoffIter = backoffs.iterator();
+        assertTrue(backoffIter.hasNext());
+        assertEquals(startMs, backoffIter.next().longValue());
+        AtomicLong prevMs = new AtomicLong(startMs);
+        backoffIter.forEachRemaining(backoffMs -> {
+            assertTrue(backoffMs >= startMs);
+            assertTrue(backoffMs <= prevMs.get() * 3);
+            assertTrue(backoffMs <= maxMs);
+            prevMs.set(backoffMs);
+        });
+    }
+
+    @Test
+    public void testDecorrelatedJitteredPolicy() throws Exception {
+        long startMs = ThreadLocalRandom.current().nextLong(1L, 1000L);
+        long maxMs = ThreadLocalRandom.current().nextLong(startMs, startMs * 2);
+        Stream<Long> backoffs = Backoff.Jitter.of(DECORRELATED, startMs, maxMs, 10).toBackoffs();
+        Iterator<Long> backoffIter = backoffs.iterator();
+        assertTrue(backoffIter.hasNext());
+        assertEquals(startMs, backoffIter.next().longValue());
+        AtomicLong prevMs = new AtomicLong(startMs);
+        backoffIter.forEachRemaining(backoffMs -> {
+            assertTrue(backoffMs >= startMs);
+            assertTrue(backoffMs <= prevMs.get() * 3);
+            assertTrue(backoffMs <= maxMs);
+            prevMs.set(backoffMs);
+        });
+    }
+
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestMathUtils.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestMathUtils.java
new file mode 100644
index 0000000..3bd58c2
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestMathUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static org.apache.bookkeeper.common.util.MathUtils.findNextPositivePowerOfTwo;
+import static org.apache.bookkeeper.common.util.MathUtils.now;
+import static org.apache.bookkeeper.common.util.MathUtils.nowInNano;
+import static org.apache.bookkeeper.common.util.MathUtils.signSafeMod;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link MathUtils}.
+ */
+public class TestMathUtils {
+
+    @Test
+    public void testSignSafeMod() {
+        assertEquals(1, signSafeMod(11, 2));
+        assertEquals(1, signSafeMod(-11, 2));
+        assertEquals(1, signSafeMod(11, -2));
+        assertEquals(-3, signSafeMod(-11, -2));
+    }
+
+    @Test
+    public void testFindNextPositivePowerOfTwo() {
+        assertEquals(16384, findNextPositivePowerOfTwo(12345));
+    }
+
+    @Test
+    public void testNow() {
+        long nowInMillis = now();
+        assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) >= nowInMillis);
+    }
+
+    @Test
+    public void testNowInNanos() {
+        long nowInNanos = nowInNano();
+        assertTrue(System.nanoTime() >= nowInNanos);
+    }
+
+}
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index 266685a..2042328 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -99,11 +99,6 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-      <version>3.3.2</version>
-    </dependency>
-    <dependency>
       <groupId>commons-cli</groupId>
       <artifactId>commons-cli</artifactId>
       <version>1.2</version>
@@ -114,17 +109,17 @@
       <version>1.6</version>
     </dependency>
     <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-      <version>2.6</version>
-    </dependency>
-    <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
       <version>2.4</version>
     </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons-lang3.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
       <artifactId>commons-collections4</artifactId>
       <version>4.1</version>
     </dependency>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Main.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Main.java
deleted file mode 100644
index 3e9583c..0000000
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Main.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.bookkeeper.util;
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import java.io.IOException;
-
-import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.BookieServer;
-
-public class Main {
-
-    static void usage() {
-        System.err.println("USAGE: bookkeeper client|bookie");
-    }
-
-    /**
-     * @param args
-     * @throws InterruptedException
-     * @throws IOException
-     */
-    public static void main(String[] args) throws Exception {
-        if (args.length < 1 || !(args[0].equals("client") || args[0].equals("bookie"))) {
-            usage();
-            return;
-        }
-        String newArgs[] = new String[args.length - 1];
-        System.arraycopy(args, 1, newArgs, 0, newArgs.length);
-        if (args[0].equals("bookie")) {
-            BookieServer.main(newArgs);
-        } else {
-            BookieClient.main(newArgs);
-        }
-    }
-
-}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
index 1b3044d..d497b04 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
@@ -17,88 +17,10 @@
  */
 package org.apache.bookkeeper.util;
 
-import java.util.concurrent.TimeUnit;
-
 /**
  * Provides misc math functions that don't come standard
+ *
+ * @Deprecated since 4.6.0, in favor of using {@link org.apache.bookkeeper.common.util.MathUtils}.
  */
-public class MathUtils {
-    private static final long NANOSECONDS_PER_MILLISECOND = 1000000;
-    public static int signSafeMod(long dividend, int divisor) {
-        int mod = (int) (dividend % divisor);
-
-        if (mod < 0) {
-            mod += divisor;
-        }
-
-        return mod;
-
-    }
-
-    public static int findNextPositivePowerOfTwo(final int value) {
-        return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
-    }
-
-    /**
-     * Current time from some arbitrary time base in the past, counting in
-     * milliseconds, and not affected by settimeofday or similar system clock
-     * changes. This is appropriate to use when computing how much longer to
-     * wait for an interval to expire.
-     *
-     * NOTE: only use it for measuring.
-     * http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
-     *
-     * @return current time in milliseconds.
-     */
-    public static long now() {
-        return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
-    }
-
-    /**
-     * Current time from some arbitrary time base in the past, counting in
-     * nanoseconds, and not affected by settimeofday or similar system clock
-     * changes. This is appropriate to use when computing how much longer to
-     * wait for an interval to expire.
-     *
-     * NOTE: only use it for measuring.
-     * http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
-     *
-     * @return current time in nanoseconds.
-     */
-    public static long nowInNano() {
-        return System.nanoTime();
-    }
-
-    /**
-     * Milliseconds elapsed since the time specified, the input is nanoTime
-     * the only conversion happens when computing the elapsed time
-     *
-     * @param startNanoTime the start of the interval that we are measuring
-     * @return elapsed time in milliseconds.
-     */
-    public static long elapsedMSec (long startNanoTime) {
-       return (System.nanoTime() - startNanoTime)/ NANOSECONDS_PER_MILLISECOND;
-    }
-
-    /**
-     * Microseconds elapsed since the time specified, the input is nanoTime
-     * the only conversion happens when computing the elapsed time
-     *
-     * @param startNanoTime the start of the interval that we are measuring
-     * @return elapsed time in milliseconds.
-     */
-    public static long elapsedMicroSec(long startNanoTime) {
-        return TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanoTime);
-    }
-
-    /**
-     * Nanoseconds elapsed since the time specified, the input is nanoTime
-     * the only conversion happens when computing the elapsed time
-     *
-     * @param startNanoTime the start of the interval that we are measuring
-     * @return elapsed time in milliseconds.
-     */
-    public static long elapsedNanos(long startNanoTime) {
-       return System.nanoTime() - startNanoTime;
-    }
+public class MathUtils extends org.apache.bookkeeper.common.util.MathUtils {
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
index 76f0830..d832d18 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
@@ -17,26 +17,15 @@
  */
 package org.apache.bookkeeper.util;
 
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.Random;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,59 +43,15 @@ import org.slf4j.LoggerFactory;
  * achieved by hashing the key objects to threads by their {@link #hashCode()}
  * method.
  *
+ * @Deprecated since 4.6.0, in favor of using {@link org.apache.bookkeeper.common.util.OrderedScheduler}.
  */
-public class OrderedSafeExecutor {
-    final static long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
-    final String name;
-    final ListeningScheduledExecutorService threads[];
-    final long threadIds[];
-    final Random rand = new Random();
-    final OpStatsLogger taskExecutionStats;
-    final OpStatsLogger taskPendingStats;
-    final boolean traceTaskExecution;
-    final long warnTimeMicroSec;
+public class OrderedSafeExecutor extends org.apache.bookkeeper.common.util.OrderedScheduler {
 
     public static Builder newBuilder() {
         return new Builder();
     }
 
-    public static class Builder {
-        private String name = "OrderedSafeExecutor";
-        private int numThreads = Runtime.getRuntime().availableProcessors();
-        private ThreadFactory threadFactory = null;
-        private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
-        private boolean traceTaskExecution = false;
-        private long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
-
-        public Builder name(String name) {
-            this.name = name;
-            return this;
-        }
-
-        public Builder numThreads(int num) {
-            this.numThreads = num;
-            return this;
-        }
-
-        public Builder threadFactory(ThreadFactory threadFactory) {
-            this.threadFactory = threadFactory;
-            return this;
-        }
-
-        public Builder statsLogger(StatsLogger statsLogger) {
-            this.statsLogger = statsLogger;
-            return this;
-        }
-
-        public Builder traceTaskExecution(boolean enabled) {
-            this.traceTaskExecution = enabled;
-            return this;
-        }
-
-        public Builder traceTaskWarnTimeMicroSec(long warnTimeMicroSec) {
-            this.warnTimeMicroSec = warnTimeMicroSec;
-            return this;
-        }
+    public static class Builder extends AbstractBuilder<OrderedSafeExecutor> {
 
         public OrderedSafeExecutor build() {
             if (null == threadFactory) {
@@ -118,35 +63,6 @@ public class OrderedSafeExecutor {
 
     }
 
-    private class TimedRunnable extends SafeRunnable {
-        final SafeRunnable runnable;
-        final long initNanos;
-
-        TimedRunnable(SafeRunnable runnable) {
-            this.runnable = runnable;
-            this.initNanos = MathUtils.nowInNano();
-         }
-
-        @Override
-        public void safeRun() {
-            taskPendingStats.registerSuccessfulEvent(initNanos, TimeUnit.NANOSECONDS);
-            long startNanos = MathUtils.nowInNano();
-            this.runnable.safeRun();
-            long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos);
-            taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
-            if (elapsedMicroSec >= warnTimeMicroSec) {
-                logger.warn("Runnable {}:{} took too long {} micros to execute.",
-                            new Object[] { runnable, runnable.getClass(), elapsedMicroSec });
-            }
-        }
-     }
-
-    @Deprecated
-    public OrderedSafeExecutor(int numThreads, String threadName) {
-        this(threadName, numThreads, Executors.defaultThreadFactory(), NullStatsLogger.INSTANCE,
-             false, WARN_TIME_MICRO_SEC_DEFAULT);
-    }
-
     /**
      * Constructs Safe executor
      *
@@ -166,121 +82,14 @@ public class OrderedSafeExecutor {
     private OrderedSafeExecutor(String baseName, int numThreads, ThreadFactory threadFactory,
                                 StatsLogger statsLogger, boolean traceTaskExecution,
                                 long warnTimeMicroSec) {
-        Preconditions.checkArgument(numThreads > 0);
-        Preconditions.checkArgument(!StringUtils.isBlank(baseName));
-
-        this.warnTimeMicroSec = warnTimeMicroSec;
-        name = baseName;
-        threads = new ListeningScheduledExecutorService[numThreads];
-        threadIds = new long[numThreads];
-        for (int i = 0; i < numThreads; i++) {
-            final ScheduledThreadPoolExecutor thread =  new ScheduledThreadPoolExecutor(1,
-                    new ThreadFactoryBuilder()
-                        .setNameFormat(name + "-orderedsafeexecutor-" + i + "-%d")
-                        .setThreadFactory(threadFactory)
-                        .build());
-            threads[i] = MoreExecutors.listeningDecorator(thread);
-            final int idx = i;
-            try {
-                threads[idx].submit(new SafeRunnable() {
-                    @Override
-                    public void safeRun() {
-                        threadIds[idx] = Thread.currentThread().getId();
-                    }
-                }).get();
-            } catch (InterruptedException e) {
-                throw new RuntimeException("Couldn't start thread " + i, e);
-            } catch (ExecutionException e) {
-                throw new RuntimeException("Couldn't start thread " + i, e);
-            }
-
-            // Register gauges
-            statsLogger.registerGauge(String.format("%s-queue-%d", name, idx), new Gauge<Number>() {
-                @Override
-                public Number getDefaultValue() {
-                    return 0;
-                }
-
-                @Override
-                public Number getSample() {
-                    return thread.getQueue().size();
-                }
-            });
-            statsLogger.registerGauge(String.format("%s-completed-tasks-%d", name, idx), new Gauge<Number>() {
-                @Override
-                public Number getDefaultValue() {
-                    return 0;
-                }
-
-                @Override
-                public Number getSample() {
-                    return thread.getCompletedTaskCount();
-                }
-            });
-            statsLogger.registerGauge(String.format("%s-total-tasks-%d", name, idx), new Gauge<Number>() {
-                @Override
-                public Number getDefaultValue() {
-                    return 0;
-                }
-
-                @Override
-                public Number getSample() {
-                    return thread.getTaskCount();
-                }
-            });
-        }
-
-        // Stats
-        this.taskExecutionStats = statsLogger.scope(name).getOpStatsLogger("task_execution");
-        this.taskPendingStats = statsLogger.scope(name).getOpStatsLogger("task_queued");
-        this.traceTaskExecution = traceTaskExecution;
-    }
-
-    public ListeningScheduledExecutorService chooseThread() {
-        // skip random # generation in this special case
-        if (threads.length == 1) {
-            return threads[0];
-        }
-
-        return threads[rand.nextInt(threads.length)];
-    }
-
-    public ListeningScheduledExecutorService chooseThread(Object orderingKey) {
-        // skip hashcode generation in this special case
-        if (threads.length == 1) {
-            return threads[0];
-        }
-
-        return threads[MathUtils.signSafeMod(orderingKey.hashCode(), threads.length)];
-    }
-
-    /**
-     * skip hashcode generation in this special case
-     *
-     * @param orderingKey long ordering key
-     * @return the thread for executing this order key
-     */
-    public ListeningScheduledExecutorService chooseThread(long orderingKey) {
-        if (threads.length == 1) {
-            return threads[0];
-        }
-
-        return threads[MathUtils.signSafeMod(orderingKey, threads.length)];
-    }
-
-    private SafeRunnable timedRunnable(SafeRunnable r) {
-        if (traceTaskExecution) {
-            return new TimedRunnable(r);
-        } else {
-            return r;
-        }
+        super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution, warnTimeMicroSec);
     }
 
     /**
      * schedules a one time action to execute
      */
     public void submit(SafeRunnable r) {
-        chooseThread().submit(timedRunnable(r));
+        super.submit(r);
     }
 
     /**
@@ -289,7 +98,7 @@ public class OrderedSafeExecutor {
      * @param r
      */
     public ListenableFuture<?> submitOrdered(Object orderingKey, SafeRunnable r) {
-        return chooseThread(orderingKey).submit(timedRunnable(r));
+        return super.submitOrdered(orderingKey, r);
     }
 
     /**
@@ -298,7 +107,7 @@ public class OrderedSafeExecutor {
      * @param r
      */
     public void submitOrdered(long orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(r);
+        super.submitOrdered(orderingKey, r);
     }
 
     /**
@@ -307,18 +116,7 @@ public class OrderedSafeExecutor {
      * @param r
      */
     public void submitOrdered(int orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).execute(r);
-    }
-
-    /**
-     * schedules a one time action to execute with an ordering guarantee on the key.
-     *
-     * @param orderingKey
-     * @param callable
-     */
-    public <T> ListenableFuture<T> submitOrdered(Object orderingKey,
-                                                 java.util.concurrent.Callable<T> callable) {
-        return chooseThread(orderingKey).submit(callable);
+        super.submitOrdered(orderingKey, r);
     }
 
     /**
@@ -330,7 +128,7 @@ public class OrderedSafeExecutor {
      * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion
      */
     public ScheduledFuture<?> schedule(SafeRunnable command, long delay, TimeUnit unit) {
-        return chooseThread().schedule(command, delay, unit);
+        return super.schedule(command, delay, unit);
     }
 
     /**
@@ -343,7 +141,7 @@ public class OrderedSafeExecutor {
      * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion
      */
     public ScheduledFuture<?> scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) {
-        return chooseThread(orderingKey).schedule(command, delay, unit);
+        return super.scheduleOrdered(orderingKey, command, delay, unit);
     }
 
     /**
@@ -360,7 +158,7 @@ public class OrderedSafeExecutor {
      * method will throw an exception upon cancellation
      */
     public ScheduledFuture<?> scheduleAtFixedRate(SafeRunnable command, long initialDelay, long period, TimeUnit unit) {
-        return chooseThread().scheduleAtFixedRate(command, initialDelay, period, unit);
+        return super.scheduleAtFixedRate(command, initialDelay, period, unit);
     }
 
     /**
@@ -379,7 +177,7 @@ public class OrderedSafeExecutor {
      */
     public ScheduledFuture<?> scheduleAtFixedRateOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
             long period, TimeUnit unit) {
-        return chooseThread(orderingKey).scheduleAtFixedRate(command, initialDelay, period, unit);
+        return super.scheduleAtFixedRateOrdered(orderingKey, command, initialDelay, period, unit);
     }
 
     /**
@@ -397,7 +195,7 @@ public class OrderedSafeExecutor {
      */
     public ScheduledFuture<?> scheduleWithFixedDelay(SafeRunnable command, long initialDelay, long delay,
             TimeUnit unit) {
-        return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit);
+        return super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
     }
 
     /**
@@ -416,48 +214,7 @@ public class OrderedSafeExecutor {
      */
     public ScheduledFuture<?> scheduleWithFixedDelayOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
             long delay, TimeUnit unit) {
-        return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
-    }
-
-    private long getThreadID(long orderingKey) {
-        // skip hashcode generation in this special case
-        if (threadIds.length == 1) {
-            return threadIds[0];
-        }
-
-        return threadIds[MathUtils.signSafeMod(orderingKey, threadIds.length)];
-    }
-
-    public void shutdown() {
-        for (int i = 0; i < threads.length; i++) {
-            threads[i].shutdown();
-        }
-    }
-
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
-        boolean ret = true;
-        for (int i = 0; i < threads.length; i++) {
-            ret = ret && threads[i].awaitTermination(timeout, unit);
-        }
-        return ret;
-    }
-
-    /**
-     * Force threads shutdown (cancel active requests) after specified delay,
-     * to be used after shutdown() rejects new requests.
-     */
-    public void forceShutdown(long timeout, TimeUnit unit) {
-        for (int i = 0; i < threads.length; i++) {
-            try {
-                if (!threads[i].awaitTermination(timeout, unit)) {
-                    threads[i].shutdownNow();
-                }
-            }
-            catch (InterruptedException exception) {
-                threads[i].shutdownNow();
-                Thread.currentThread().interrupt();
-            }
-        }
+        return super.scheduleWithFixedDelayOrdered(orderingKey, command, initialDelay, delay, unit);
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java
index 8b1e0d0..7e56cd1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java
@@ -1,7 +1,3 @@
-package org.apache.bookkeeper.util;
-
-import java.util.function.Consumer;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,24 +15,11 @@ import java.util.function.Consumer;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.bookkeeper.util;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class SafeRunnable implements Runnable {
-
-    static final Logger logger = LoggerFactory.getLogger(SafeRunnable.class);
-
-    @Override
-    public void run() {
-        try {
-            safeRun();
-        } catch(Throwable t) {
-            logger.error("Unexpected throwable caught ", t);
-        }
-    }
+import java.util.function.Consumer;
 
-    public abstract void safeRun();
+public abstract class SafeRunnable implements org.apache.bookkeeper.common.util.SafeRunnable {
 
     /**
      * Utility method to use SafeRunnable from lambdas
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index efee56a..1925cde 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -46,6 +46,27 @@
     <Method name="getData" />
     <Bug pattern="EI_EXPOSE_REP" />
   </Match>
+  <Match>
+    <Class name="org.apache.bookkeeper.common.concurrent.FutureUtils"/>
+    <Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL" />
+  </Match>
+  <Match>
+    <Class name="org.apache.bookkeeper.common.concurrent.FutureUtils$1"/>
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.bookkeeper.common.concurrent.FutureUtils"/>
+    <Method name="Void" />
+    <Bug pattern="NM_METHOD_NAMING_CONVENTION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.bookkeeper.util.MathUtils"/>
+    <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
+  </Match>
+  <Match>
+    <Class name="org.apache.bookkeeper.util.SafeRunnable"/>
+    <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
+  </Match>
   <And>
     <Bug category="MT_CORRECTNESS"/>
     <Class name="~org.apache.bookkeeper.util.collections\.[^.]+"/>
diff --git a/pom.xml b/pom.xml
index 9879022..d5b066f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,8 @@
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <!-- dependencies -->
     <commons-configuration.version>1.10</commons-configuration.version>
+    <commons-lang3.version>3.3.2</commons-lang3.version>
+    <google.code.version>3.0.2</google.code.version>
     <guava.version>20.0</guava.version>
     <hamcrest.version>1.3</hamcrest.version>
     <jmh.version>1.19</jmh.version>

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].

Mime
View raw message