bookkeeper-distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [11/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future
Date Wed, 21 Jun 2017 17:20:43 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
index a8eb482..77151df 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
@@ -19,28 +19,24 @@ package org.apache.distributedlog.lock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
-import com.twitter.concurrent.AsyncSemaphore;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.distributedlog.exceptions.LockingException;
 import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.FutureUtils.OrderedFutureEventListener;
+import org.apache.distributedlog.common.concurrent.AsyncSemaphore;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Distributed lock, using ZooKeeper.
@@ -78,22 +74,22 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
     private final long lockTimeout;
     private final DistributedLockContext lockContext = new DistributedLockContext();
 
-    private final AsyncSemaphore lockSemaphore = new AsyncSemaphore(1);
+    private final AsyncSemaphore lockSemaphore = new AsyncSemaphore(1, Optional.empty());
     // We have two lock acquire futures:
     // 1. lock acquire future: for the initial acquire op
     // 2. lock reacquire future: for reacquire necessary when session expires, lock is closed
-    private Future<ZKDistributedLock> lockAcquireFuture = null;
-    private Future<ZKDistributedLock> lockReacquireFuture = null;
+    private CompletableFuture<ZKDistributedLock> lockAcquireFuture = null;
+    private CompletableFuture<ZKDistributedLock> lockReacquireFuture = null;
     // following variable tracking the status of acquire process
     //   => create (internalLock) => tryLock (tryLockFuture) => waitForAcquire (lockWaiter)
     private SessionLock internalLock = null;
-    private Future<LockWaiter> tryLockFuture = null;
+    private CompletableFuture<LockWaiter> tryLockFuture = null;
     private LockWaiter lockWaiter = null;
     // exception indicating if the reacquire failed
     private LockingException lockReacquireException = null;
     // closeFuture
     private volatile boolean closed = false;
-    private Future<Void> closeFuture = null;
+    private CompletableFuture<Void> closeFuture = null;
 
     // A counter to track how many re-acquires happened during a lock's life cycle.
     private final AtomicInteger reacquireCount = new AtomicInteger(0);
@@ -136,25 +132,19 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
      * Asynchronously acquire the lock. Technically the try phase of this operation--which adds us to the waiter
      * list--is executed synchronously, but the lock wait itself doesn't block.
      */
-    public synchronized Future<ZKDistributedLock> asyncAcquire() {
+    public synchronized CompletableFuture<ZKDistributedLock> asyncAcquire() {
         if (null != lockAcquireFuture) {
-            return Future.exception(new UnexpectedException("Someone is already acquiring/acquired lock " + lockPath));
+            return FutureUtils.exception(new UnexpectedException("Someone is already acquiring/acquired lock " + lockPath));
         }
-        final Promise<ZKDistributedLock> promise =
-                new Promise<ZKDistributedLock>(new Function<Throwable, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable cause) {
-                lockStateExecutor.submit(lockPath, new Runnable() {
-                    @Override
-                    public void run() {
-                        asyncClose();
-                    }
-                });
-                return BoxedUnit.UNIT;
+        final CompletableFuture<ZKDistributedLock> promise = FutureUtils.createFuture();
+        promise.whenComplete((zkDistributedLock, throwable) -> {
+            if (null == throwable || !(throwable instanceof CancellationException)) {
+                return;
             }
+            lockStateExecutor.submit(lockPath, () -> asyncClose());
         });
         final Stopwatch stopwatch = Stopwatch.createStarted();
-        promise.addEventListener(new FutureEventListener<ZKDistributedLock>() {
+        promise.whenComplete(new FutureEventListener<ZKDistributedLock>() {
             @Override
             public void onSuccess(ZKDistributedLock lock) {
                 acquireStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
@@ -176,41 +166,39 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
         return promise;
     }
 
-    void doAsyncAcquireWithSemaphore(final Promise<ZKDistributedLock> acquirePromise,
+    void doAsyncAcquireWithSemaphore(final CompletableFuture<ZKDistributedLock> acquirePromise,
                                      final long lockTimeout) {
-        lockSemaphore.acquireAndRun(new AbstractFunction0<Future<ZKDistributedLock>>() {
-            @Override
-            public Future<ZKDistributedLock> apply() {
-                doAsyncAcquire(acquirePromise, lockTimeout);
-                return acquirePromise;
-            }
+        lockSemaphore.acquireAndRun(() -> {
+            doAsyncAcquire(acquirePromise, lockTimeout);
+            return acquirePromise;
         });
     }
 
-    void doAsyncAcquire(final Promise<ZKDistributedLock> acquirePromise,
+    void doAsyncAcquire(final CompletableFuture<ZKDistributedLock> acquirePromise,
                         final long lockTimeout) {
         LOG.trace("Async Lock Acquire {}", lockPath);
         try {
             checkLockState();
         } catch (IOException ioe) {
-            FutureUtils.setException(acquirePromise, ioe);
+            FutureUtils.completeExceptionally(acquirePromise, ioe);
             return;
         }
 
         if (haveLock()) {
             // it already hold the lock
-            FutureUtils.setValue(acquirePromise, this);
+            FutureUtils.complete(acquirePromise, this);
             return;
         }
 
-        lockFactory.createLock(lockPath, lockContext).addEventListener(OrderedFutureEventListener.of(
-                new FutureEventListener<SessionLock>() {
+        lockFactory
+            .createLock(lockPath, lockContext)
+            .whenCompleteAsync(new FutureEventListener<SessionLock>() {
             @Override
             public void onSuccess(SessionLock lock) {
                 synchronized (ZKDistributedLock.this) {
                     if (closed) {
                         LOG.info("Skipping tryLocking lock {} since it is already closed", lockPath);
-                        FutureUtils.setException(acquirePromise, newLockClosedException());
+                        FutureUtils.completeExceptionally(acquirePromise, newLockClosedException());
                         return;
                     }
                 }
@@ -223,62 +211,64 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
 
             @Override
             public void onFailure(Throwable cause) {
-                FutureUtils.setException(acquirePromise, cause);
+                FutureUtils.completeExceptionally(acquirePromise, cause);
             }
-        }, lockStateExecutor, lockPath));
+        }, lockStateExecutor.chooseExecutor(lockPath));
     }
 
     void asyncTryLock(SessionLock lock,
-                      final Promise<ZKDistributedLock> acquirePromise,
+                      final CompletableFuture<ZKDistributedLock> acquirePromise,
                       final long lockTimeout) {
         if (null != tryLockFuture) {
-            tryLockFuture.cancel();
+            tryLockFuture.cancel(true);
         }
         tryLockFuture = lock.asyncTryLock(lockTimeout, TimeUnit.MILLISECONDS);
-        tryLockFuture.addEventListener(OrderedFutureEventListener.of(
-                new FutureEventListener<LockWaiter>() {
-                    @Override
-                    public void onSuccess(LockWaiter waiter) {
-                        synchronized (ZKDistributedLock.this) {
-                            if (closed) {
-                                LOG.info("Skipping acquiring lock {} since it is already closed", lockPath);
-                                waiter.getAcquireFuture().raise(new LockingException(lockPath, "lock is already closed."));
-                                FutureUtils.setException(acquirePromise, newLockClosedException());
-                                return;
-                            }
+        tryLockFuture.whenCompleteAsync(
+            new FutureEventListener<LockWaiter>() {
+                @Override
+                public void onSuccess(LockWaiter waiter) {
+                    synchronized (ZKDistributedLock.this) {
+                        if (closed) {
+                            LOG.info("Skipping acquiring lock {} since it is already closed", lockPath);
+                            waiter
+                                .getAcquireFuture()
+                                .completeExceptionally(new LockingException(lockPath, "lock is already closed."));
+                            FutureUtils.completeExceptionally(acquirePromise, newLockClosedException());
+                            return;
                         }
-                        tryLockFuture = null;
-                        lockWaiter = waiter;
-                        waitForAcquire(waiter, acquirePromise);
                     }
+                    tryLockFuture = null;
+                    lockWaiter = waiter;
+                    waitForAcquire(waiter, acquirePromise);
+                }
 
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        FutureUtils.setException(acquirePromise, cause);
-                    }
-                }, lockStateExecutor, lockPath));
+                @Override
+                public void onFailure(Throwable cause) {
+                    FutureUtils.completeExceptionally(acquirePromise, cause);
+                }
+            }, lockStateExecutor.chooseExecutor(lockPath));
     }
 
     void waitForAcquire(final LockWaiter waiter,
-                        final Promise<ZKDistributedLock> acquirePromise) {
-        waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of(
-                new FutureEventListener<Boolean>() {
-                    @Override
-                    public void onSuccess(Boolean acquired) {
-                        LOG.info("{} acquired lock {}", waiter, lockPath);
-                        if (acquired) {
-                            FutureUtils.setValue(acquirePromise, ZKDistributedLock.this);
-                        } else {
-                            FutureUtils.setException(acquirePromise,
-                                    new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner()));
-                        }
+                        final CompletableFuture<ZKDistributedLock> acquirePromise) {
+        waiter.getAcquireFuture().whenCompleteAsync(
+            new FutureEventListener<Boolean>() {
+                @Override
+                public void onSuccess(Boolean acquired) {
+                    LOG.info("{} acquired lock {}", waiter, lockPath);
+                    if (acquired) {
+                        FutureUtils.complete(acquirePromise, ZKDistributedLock.this);
+                    } else {
+                        FutureUtils.completeExceptionally(acquirePromise,
+                                new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner()));
                     }
+                }
 
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        FutureUtils.setException(acquirePromise, cause);
-                    }
-                }, lockStateExecutor, lockPath));
+                @Override
+                public void onFailure(Throwable cause) {
+                    FutureUtils.completeExceptionally(acquirePromise, cause);
+                }
+            }, lockStateExecutor.chooseExecutor(lockPath));
     }
 
     /**
@@ -300,7 +290,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
      * @throws LockingException     if the lock attempt fails
      */
     public synchronized void checkOwnershipAndReacquire() throws LockingException {
-        if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) {
+        if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) {
             throw new LockingException(lockPath, "check ownership before acquiring");
         }
 
@@ -322,7 +312,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
      * @throws LockingException     if the lock attempt fails
      */
     public synchronized void checkOwnership() throws LockingException {
-        if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) {
+        if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) {
             throw new LockingException(lockPath, "check ownership before acquiring");
         }
         if (!haveLock()) {
@@ -336,12 +326,12 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
     }
 
     @VisibleForTesting
-    synchronized Future<ZKDistributedLock> getLockReacquireFuture() {
+    synchronized CompletableFuture<ZKDistributedLock> getLockReacquireFuture() {
         return lockReacquireFuture;
     }
 
     @VisibleForTesting
-    synchronized Future<ZKDistributedLock> getLockAcquireFuture() {
+    synchronized CompletableFuture<ZKDistributedLock> getLockAcquireFuture() {
         return lockAcquireFuture;
     }
 
@@ -360,71 +350,65 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
     }
 
     void closeWaiter(final LockWaiter waiter,
-                     final Promise<Void> closePromise) {
+                     final CompletableFuture<Void> closePromise) {
         if (null == waiter) {
             interruptTryLock(tryLockFuture, closePromise);
         } else {
-            waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of(
-                    new FutureEventListener<Boolean>() {
-                        @Override
-                        public void onSuccess(Boolean value) {
-                            unlockInternalLock(closePromise);
-                        }
-                        @Override
-                        public void onFailure(Throwable cause) {
-                            unlockInternalLock(closePromise);
-                        }
-                    }, lockStateExecutor, lockPath));
-            FutureUtils.cancel(waiter.getAcquireFuture());
+            waiter.getAcquireFuture().whenCompleteAsync(
+                new FutureEventListener<Boolean>() {
+                    @Override
+                    public void onSuccess(Boolean value) {
+                        unlockInternalLock(closePromise);
+                    }
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        unlockInternalLock(closePromise);
+                    }
+                }, lockStateExecutor.chooseExecutor(lockPath));
+            waiter.getAcquireFuture().cancel(true);
         }
     }
 
-    void interruptTryLock(final Future<LockWaiter> tryLockFuture,
-                          final Promise<Void> closePromise) {
+    void interruptTryLock(final CompletableFuture<LockWaiter> tryLockFuture,
+                          final CompletableFuture<Void> closePromise) {
         if (null == tryLockFuture) {
             unlockInternalLock(closePromise);
         } else {
-            tryLockFuture.addEventListener(OrderedFutureEventListener.of(
-                    new FutureEventListener<LockWaiter>() {
-                        @Override
-                        public void onSuccess(LockWaiter waiter) {
-                            closeWaiter(waiter, closePromise);
-                        }
-                        @Override
-                        public void onFailure(Throwable cause) {
-                            unlockInternalLock(closePromise);
-                        }
-                    }, lockStateExecutor, lockPath));
-            FutureUtils.cancel(tryLockFuture);
+            tryLockFuture.whenCompleteAsync(
+                new FutureEventListener<LockWaiter>() {
+                    @Override
+                    public void onSuccess(LockWaiter waiter) {
+                        closeWaiter(waiter, closePromise);
+                    }
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        unlockInternalLock(closePromise);
+                    }
+                }, lockStateExecutor.chooseExecutor(lockPath));
+            tryLockFuture.cancel(true);
         }
     }
 
-    synchronized void unlockInternalLock(final Promise<Void> closePromise) {
+    synchronized void unlockInternalLock(final CompletableFuture<Void> closePromise) {
         if (internalLock == null) {
-            FutureUtils.setValue(closePromise, null);
+            FutureUtils.complete(closePromise, null);
         } else {
-            internalLock.asyncUnlock().ensure(new AbstractFunction0<BoxedUnit>() {
-                @Override
-                public BoxedUnit apply() {
-                    FutureUtils.setValue(closePromise, null);
-                    return BoxedUnit.UNIT;
-                }
-            });
+            internalLock.asyncUnlock().whenComplete((value, cause) -> closePromise.complete(null));
         }
     }
 
     @Override
-    public Future<Void> asyncClose() {
-        final Promise<Void> closePromise;
+    public CompletableFuture<Void> asyncClose() {
+        final CompletableFuture<Void> closePromise;
         synchronized (this) {
             if (closed) {
                 return closeFuture;
             }
             closed = true;
-            closeFuture = closePromise = new Promise<Void>();
+            closeFuture = closePromise = new CompletableFuture<Void>();
         }
-        final Promise<Void> closeWaiterFuture = new Promise<Void>();
-        closeWaiterFuture.addEventListener(OrderedFutureEventListener.of(new FutureEventListener<Void>() {
+        final CompletableFuture<Void> closeWaiterFuture = new CompletableFuture<Void>();
+        closeWaiterFuture.whenCompleteAsync(new FutureEventListener<Void>() {
             @Override
             public void onSuccess(Void value) {
                 complete();
@@ -435,9 +419,9 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
             }
 
             private void complete() {
-                FutureUtils.setValue(closePromise, null);
+                FutureUtils.complete(closePromise, null);
             }
-        }, lockStateExecutor, lockPath));
+        }, lockStateExecutor.chooseExecutor(lockPath));
         lockStateExecutor.submit(lockPath, new Runnable() {
             @Override
             public void run() {
@@ -449,7 +433,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
 
     void internalReacquireLock(final AtomicInteger numRetries,
                                final long lockTimeout,
-                               final Promise<ZKDistributedLock> reacquirePromise) {
+                               final CompletableFuture<ZKDistributedLock> reacquirePromise) {
         lockStateExecutor.submit(lockPath, new Runnable() {
             @Override
             public void run() {
@@ -460,25 +444,25 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
 
     void doInternalReacquireLock(final AtomicInteger numRetries,
                                  final long lockTimeout,
-                                 final Promise<ZKDistributedLock> reacquirePromise) {
+                                 final CompletableFuture<ZKDistributedLock> reacquirePromise) {
         internalTryRetries.inc();
-        Promise<ZKDistributedLock> tryPromise = new Promise<ZKDistributedLock>();
-        tryPromise.addEventListener(new FutureEventListener<ZKDistributedLock>() {
+        CompletableFuture<ZKDistributedLock> tryPromise = new CompletableFuture<ZKDistributedLock>();
+        tryPromise.whenComplete(new FutureEventListener<ZKDistributedLock>() {
             @Override
             public void onSuccess(ZKDistributedLock lock) {
-                FutureUtils.setValue(reacquirePromise, lock);
+                FutureUtils.complete(reacquirePromise, lock);
             }
 
             @Override
             public void onFailure(Throwable cause) {
                 if (cause instanceof OwnershipAcquireFailedException) {
                     // the lock has been acquired by others
-                    FutureUtils.setException(reacquirePromise, cause);
+                    FutureUtils.completeExceptionally(reacquirePromise, cause);
                 } else {
                     if (numRetries.getAndDecrement() > 0 && !closed) {
                         internalReacquireLock(numRetries, lockTimeout, reacquirePromise);
                     } else {
-                        FutureUtils.setException(reacquirePromise, cause);
+                        FutureUtils.completeExceptionally(reacquirePromise, cause);
                     }
                 }
             }
@@ -486,9 +470,9 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
         doAsyncAcquireWithSemaphore(tryPromise, 0);
     }
 
-    private Future<ZKDistributedLock> reacquireLock(boolean throwLockAcquireException) throws LockingException {
+    private CompletableFuture<ZKDistributedLock> reacquireLock(boolean throwLockAcquireException) throws LockingException {
         final Stopwatch stopwatch = Stopwatch.createStarted();
-        Promise<ZKDistributedLock> lockPromise;
+        CompletableFuture<ZKDistributedLock> lockPromise;
         synchronized (this) {
             if (closed) {
                 throw newLockClosedException();
@@ -504,8 +488,8 @@ public class ZKDistributedLock implements LockListener, DistributedLock {
                 return lockReacquireFuture;
             }
             LOG.info("reacquiring lock at {}", lockPath);
-            lockReacquireFuture = lockPromise = new Promise<ZKDistributedLock>();
-            lockReacquireFuture.addEventListener(new FutureEventListener<ZKDistributedLock>() {
+            lockReacquireFuture = lockPromise = new CompletableFuture<ZKDistributedLock>();
+            lockReacquireFuture.whenComplete(new FutureEventListener<ZKDistributedLock>() {
                 @Override
                 public void onSuccess(ZKDistributedLock lock) {
                     // if re-acquire successfully, clear the state.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
index e2699e7..9fdcbf1 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
@@ -17,8 +17,22 @@
  */
 package org.apache.distributedlog.lock;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import org.apache.distributedlog.DistributedLogConstants;
 import org.apache.distributedlog.util.FailpointUtils;
 import org.apache.distributedlog.exceptions.LockingException;
@@ -27,18 +41,10 @@ import org.apache.distributedlog.exceptions.DLInterruptedException;
 import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
 import org.apache.distributedlog.exceptions.ZKException;
-import org.apache.distributedlog.stats.OpStatsListener;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.stats.OpStatsListener;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import com.twitter.util.TimeoutException;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -54,20 +60,6 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static com.google.common.base.Charsets.UTF_8;
 
 /**
  * A lock under a given zookeeper session. This is a one-time lock.
@@ -276,7 +268,7 @@ class ZKSessionLock implements SessionLock {
     private StateManagement lockState;
     private final DistributedLockContext lockContext;
 
-    private final Promise<Boolean> acquireFuture;
+    private final CompletableFuture<Boolean> acquireFuture;
     private String currentId;
     private String currentNode;
     private String watchedNode;
@@ -342,15 +334,14 @@ class ZKSessionLock implements SessionLock {
         this.unlockStats = statsLogger.getOpStatsLogger("unlock");
 
         // Attach interrupt handler to acquire future so clients can abort the future.
-        this.acquireFuture = new Promise<Boolean>(new com.twitter.util.Function<Throwable, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable t) {
+        this.acquireFuture = FutureUtils.createFuture();
+        this.acquireFuture.whenComplete((value, cause) -> {
+            if (null != cause) {
                 // This will set the lock state to closed, and begin to cleanup the zk lock node.
                 // We have to be careful not to block here since doing so blocks the ordered lock
                 // state executor which can cause deadlocks depending on how futures are chained.
-                ZKSessionLock.this.asyncUnlock(t);
+                ZKSessionLock.this.asyncUnlock(cause);
                 // Note re. logging and exceptions: errors are already logged by unlockAsync.
-                return BoxedUnit.UNIT;
             }
         });
     }
@@ -433,7 +424,7 @@ class ZKSessionLock implements SessionLock {
      * @param promise
      *          promise
      */
-    protected <T> void executeLockAction(final int lockEpoch, final LockAction func, final Promise<T> promise) {
+    protected <T> void executeLockAction(final int lockEpoch, final LockAction func, final CompletableFuture<T> promise) {
         lockStateExecutor.submit(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
@@ -453,7 +444,7 @@ class ZKSessionLock implements SessionLock {
                         LOG.trace("{} skipped executing lock action '{}' for lock {}, since epoch is changed from {} to {}.",
                                 new Object[]{lockId, func.getActionName(), lockPath, lockEpoch, currentEpoch});
                     }
-                    promise.setException(new EpochChangedException(lockPath, lockEpoch, currentEpoch));
+                    promise.completeExceptionally(new EpochChangedException(lockPath, lockEpoch, currentEpoch));
                 }
             }
         });
@@ -516,7 +507,7 @@ class ZKSessionLock implements SessionLock {
      *          node name
      * @return client id and its ephemeral owner.
      */
-    static Future<Pair<String, Long>> asyncParseClientID(ZooKeeper zkClient, String lockPath, String nodeName) {
+    static CompletableFuture<Pair<String, Long>> asyncParseClientID(ZooKeeper zkClient, String lockPath, String nodeName) {
         String[] parts = nodeName.split("_");
         // member_<clientid>_s<owner_session>_
         if (4 == parts.length && parts[2].startsWith("s")) {
@@ -524,19 +515,19 @@ class ZKSessionLock implements SessionLock {
             String clientId;
             try {
                 clientId = URLDecoder.decode(parts[1], UTF_8.name());
-                return Future.value(Pair.of(clientId, sessionOwner));
+                return FutureUtils.value(Pair.of(clientId, sessionOwner));
             } catch (UnsupportedEncodingException e) {
                 // if failed to parse client id, we have to get client id by zookeeper#getData.
             }
         }
-        final Promise<Pair<String, Long>> promise = new Promise<Pair<String, Long>>();
+        final CompletableFuture<Pair<String, Long>> promise = new CompletableFuture<Pair<String, Long>>();
         zkClient.getData(lockPath + "/" + nodeName, false, new AsyncCallback.DataCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                 if (KeeperException.Code.OK.intValue() != rc) {
-                    promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                    promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                 } else {
-                    promise.setValue(Pair.of(deserializeClientId(data), stat.getEphemeralOwner()));
+                    promise.complete(Pair.of(deserializeClientId(data), stat.getEphemeralOwner()));
                 }
             }
         }, null);
@@ -544,8 +535,8 @@ class ZKSessionLock implements SessionLock {
     }
 
     @Override
-    public Future<LockWaiter> asyncTryLock(final long timeout, final TimeUnit unit) {
-        final Promise<String> result = new Promise<String>();
+    public CompletableFuture<LockWaiter> asyncTryLock(final long timeout, final TimeUnit unit) {
+        final CompletableFuture<String> result = new CompletableFuture<String>();
         final boolean wait = DistributedLogConstants.LOCK_IMMEDIATE != timeout;
         if (wait) {
             asyncTryLock(wait, result);
@@ -559,11 +550,11 @@ class ZKSessionLock implements SessionLock {
                         @Override
                         public void safeRun() {
                             if (!lockState.inState(State.INIT)) {
-                                result.setException(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState()));
+                                result.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState()));
                                 return;
                             }
                             if (KeeperException.Code.OK.intValue() != rc) {
-                                result.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                                result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                                 return;
                             }
 
@@ -571,25 +562,20 @@ class ZKSessionLock implements SessionLock {
 
                             Collections.sort(children, MEMBER_COMPARATOR);
                             if (children.size() > 0) {
-                                asyncParseClientID(zk, lockPath, children.get(0)).addEventListener(
+                                asyncParseClientID(zk, lockPath, children.get(0)).whenCompleteAsync(
                                         new FutureEventListener<Pair<String, Long>>() {
                                             @Override
                                             public void onSuccess(Pair<String, Long> owner) {
                                                 if (!checkOrClaimLockOwner(owner, result)) {
-                                                    acquireFuture.updateIfEmpty(new Return<Boolean>(false));
+                                                    acquireFuture.complete(false);
                                                 }
                                             }
 
                                             @Override
                                             public void onFailure(final Throwable cause) {
-                                                lockStateExecutor.submit(lockPath, new SafeRunnable() {
-                                                    @Override
-                                                    public void safeRun() {
-                                                        result.setException(cause);
-                                                    }
-                                                });
+                                                result.completeExceptionally(cause);
                                             }
-                                        });
+                                        }, lockStateExecutor.chooseExecutor(lockPath));
                             } else {
                                 asyncTryLock(wait, result);
                             }
@@ -599,14 +585,9 @@ class ZKSessionLock implements SessionLock {
             }, null);
         }
 
-        final Promise<Boolean> waiterAcquireFuture = new Promise<Boolean>(new com.twitter.util.Function<Throwable, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable t) {
-                acquireFuture.raise(t);
-                return BoxedUnit.UNIT;
-            }
-        });
-        return result.map(new AbstractFunction1<String, LockWaiter>() {
+        final CompletableFuture<Boolean> waiterAcquireFuture = FutureUtils.createFuture();
+        waiterAcquireFuture.whenComplete((value, cause) -> acquireFuture.completeExceptionally(cause));
+        return result.thenApply(new Function<String, LockWaiter>() {
             @Override
             public LockWaiter apply(final String currentOwner) {
                 final Exception acquireException = new OwnershipAcquireFailedException(lockPath, currentOwner);
@@ -617,7 +598,7 @@ class ZKSessionLock implements SessionLock {
                         acquireException,
                         lockStateExecutor,
                         lockPath
-                ).addEventListener(new FutureEventListener<Boolean>() {
+                ).whenComplete(new FutureEventListener<Boolean>() {
 
                     @Override
                     public void onSuccess(Boolean acquired) {
@@ -631,17 +612,17 @@ class ZKSessionLock implements SessionLock {
 
                     private void completeOrFail(final Throwable acquireCause) {
                         if (isLockHeld()) {
-                            waiterAcquireFuture.setValue(true);
+                            waiterAcquireFuture.complete(true);
                         } else {
-                            asyncUnlock().addEventListener(new FutureEventListener<BoxedUnit>() {
+                            asyncUnlock().whenComplete(new FutureEventListener<Void>() {
                                 @Override
-                                public void onSuccess(BoxedUnit value) {
-                                    waiterAcquireFuture.setException(acquireCause);
+                                public void onSuccess(Void value) {
+                                    waiterAcquireFuture.completeExceptionally(acquireCause);
                                 }
 
                                 @Override
                                 public void onFailure(Throwable cause) {
-                                    waiterAcquireFuture.setException(acquireCause);
+                                    waiterAcquireFuture.completeExceptionally(acquireCause);
                                 }
                             });
                         }
@@ -656,12 +637,12 @@ class ZKSessionLock implements SessionLock {
     }
 
     private boolean checkOrClaimLockOwner(final Pair<String, Long> currentOwner,
-                                          final Promise<String> result) {
+                                          final CompletableFuture<String> result) {
         if (lockId.compareTo(currentOwner) != 0 && !lockContext.hasLockId(currentOwner)) {
             lockStateExecutor.submit(lockPath, new SafeRunnable() {
                 @Override
                 public void safeRun() {
-                    result.setValue(currentOwner.getLeft());
+                    result.complete(currentOwner.getLeft());
                 }
             });
             return false;
@@ -672,7 +653,7 @@ class ZKSessionLock implements SessionLock {
             @Override
             public void execute() {
                 if (!lockState.inState(State.INIT)) {
-                    result.setException(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState()));
+                    result.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState()));
                     return;
                 }
                 asyncTryLock(false, result);
@@ -693,12 +674,12 @@ class ZKSessionLock implements SessionLock {
      * @param result
      *          promise to satisfy with current lock owner
      */
-    private void asyncTryLock(boolean wait, final Promise<String> result) {
-        final Promise<String> lockResult = new Promise<String>();
-        lockResult.addEventListener(new FutureEventListener<String>() {
+    private void asyncTryLock(boolean wait, final CompletableFuture<String> result) {
+        final CompletableFuture<String> lockResult = new CompletableFuture<String>();
+        lockResult.whenComplete(new FutureEventListener<String>() {
             @Override
             public void onSuccess(String currentOwner) {
-                result.setValue(currentOwner);
+                result.complete(currentOwner);
             }
 
             @Override
@@ -707,7 +688,7 @@ class ZKSessionLock implements SessionLock {
                 if (lockCause instanceof LockStateChangedException) {
                     LOG.info("skipping cleanup for {} at {} after encountering lock " +
                             "state change exception : ", new Object[] { lockId, lockPath, lockCause });
-                    result.setException(lockCause);
+                    result.completeExceptionally(lockCause);
                     return;
                 }
                 if (LOG.isDebugEnabled()) {
@@ -716,15 +697,15 @@ class ZKSessionLock implements SessionLock {
                 }
 
                 // If we encountered any exception we should cleanup
-                Future<BoxedUnit> unlockResult = asyncUnlock();
-                unlockResult.addEventListener(new FutureEventListener<BoxedUnit>() {
+                CompletableFuture<Void> unlockResult = asyncUnlock();
+                unlockResult.whenComplete(new FutureEventListener<Void>() {
                     @Override
-                    public void onSuccess(BoxedUnit value) {
-                        result.setException(lockCause);
+                    public void onSuccess(Void value) {
+                        result.completeExceptionally(lockCause);
                     }
                     @Override
                     public void onFailure(Throwable cause) {
-                        result.setException(lockCause);
+                        result.completeExceptionally(lockCause);
                     }
                 });
             }
@@ -734,7 +715,7 @@ class ZKSessionLock implements SessionLock {
 
     /**
      * Try lock. If wait is true, it would wait and watch sibling to acquire lock when
-     * the sibling is dead. <i>acquireFuture</i> will be notified either it locked successfully
+     * the sibling is dead. <i>acquireCompletableFuture</i> will be notified either it locked successfully
      * or the lock failed. The promise will only satisfy with current lock owner.
      *
      * NOTE: the <i>promise</i> is only satisfied on <i>lockStateExecutor</i>, so any
@@ -745,12 +726,12 @@ class ZKSessionLock implements SessionLock {
      * @param promise
      *          promise to satisfy with current lock owner.
      */
-    private void asyncTryLockWithoutCleanup(final boolean wait, final Promise<String> promise) {
+    private void asyncTryLockWithoutCleanup(final boolean wait, final CompletableFuture<String> promise) {
         executeLockAction(epoch.get(), new LockAction() {
             @Override
             public void execute() {
                 if (!lockState.inState(State.INIT)) {
-                    promise.setException(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState()));
+                    promise.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState()));
                     return;
                 }
                 lockState.transition(State.PREPARING);
@@ -776,7 +757,7 @@ class ZKSessionLock implements SessionLock {
                                     public void execute() {
                                         if (KeeperException.Code.OK.intValue() != rc) {
                                             KeeperException ke = KeeperException.create(KeeperException.Code.get(rc));
-                                            promise.setException(ke);
+                                            promise.completeExceptionally(ke);
                                             return;
                                         }
 
@@ -797,14 +778,12 @@ class ZKSessionLock implements SessionLock {
                                         if (lockState.isExpiredOrClosing()) {
                                             // Delete node attempt may have come after PREPARING but before create node, in which case
                                             // we'd be left with a dangling node unless we clean up.
-                                            Promise<BoxedUnit> deletePromise = new Promise<BoxedUnit>();
+                                            CompletableFuture<Void> deletePromise = new CompletableFuture<Void>();
                                             deleteLockNode(deletePromise);
-                                            deletePromise.ensure(new Function0<BoxedUnit>() {
-                                                public BoxedUnit apply() {
-                                                    promise.setException(new LockClosedException(lockPath, lockId, lockState.getState()));
-                                                    return BoxedUnit.UNIT;
-                                                }
-                                            });
+                                            FutureUtils.ensure(
+                                                deletePromise,
+                                                () -> promise.completeExceptionally(
+                                                    new LockClosedException(lockPath, lockId, lockState.getState())));
                                             return;
                                         }
 
@@ -830,7 +809,7 @@ class ZKSessionLock implements SessionLock {
     @Override
     public void tryLock(long timeout, TimeUnit unit) throws LockingException {
         final Stopwatch stopwatch = Stopwatch.createStarted();
-        Future<LockWaiter> tryFuture = asyncTryLock(timeout, unit);
+        CompletableFuture<LockWaiter> tryFuture = asyncTryLock(timeout, unit);
         LockWaiter waiter = waitForTry(stopwatch, tryFuture);
         boolean acquired = waiter.waitForAcquireQuietly();
         if (!acquired) {
@@ -838,13 +817,13 @@ class ZKSessionLock implements SessionLock {
         }
     }
 
-    synchronized LockWaiter waitForTry(Stopwatch stopwatch, Future<LockWaiter> tryFuture)
+    synchronized LockWaiter waitForTry(Stopwatch stopwatch, CompletableFuture<LockWaiter> tryFuture)
             throws LockingException {
         boolean success = false;
         boolean stateChanged = false;
         LockWaiter waiter;
         try {
-            waiter = Await.result(tryFuture, Duration.fromMilliseconds(lockOpTimeout));
+            waiter = FutureUtils.result(tryFuture, lockOpTimeout, TimeUnit.MILLISECONDS);
             success = true;
         } catch (LockStateChangedException ex) {
             stateChanged = true;
@@ -873,12 +852,12 @@ class ZKSessionLock implements SessionLock {
     }
 
     @Override
-    public Future<BoxedUnit> asyncUnlock() {
+    public CompletableFuture<Void> asyncUnlock() {
         return asyncUnlock(new LockClosedException(lockPath, lockId, lockState.getState()));
     }
 
-    Future<BoxedUnit> asyncUnlock(final Throwable cause) {
-        final Promise<BoxedUnit> promise = new Promise<BoxedUnit>();
+    CompletableFuture<Void> asyncUnlock(final Throwable cause) {
+        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
 
         // Use lock executor here rather than lock action, because we want this opertaion to be applied
         // whether the epoch has changed or not. The member node is EPHEMERAL_SEQUENTIAL so there's no
@@ -886,9 +865,9 @@ class ZKSessionLock implements SessionLock {
         lockStateExecutor.submit(lockPath, new SafeRunnable() {
             @Override
             public void safeRun() {
-                acquireFuture.updateIfEmpty(new Throw<Boolean>(cause));
+                acquireFuture.completeExceptionally(cause);
                 unlockInternal(promise);
-                promise.addEventListener(new OpStatsListener<BoxedUnit>(unlockStats));
+                promise.whenComplete(new OpStatsListener<Void>(unlockStats));
             }
         });
 
@@ -897,9 +876,9 @@ class ZKSessionLock implements SessionLock {
 
     @Override
     public void unlock() {
-        Future<BoxedUnit> unlockResult = asyncUnlock();
+        CompletableFuture<Void> unlockResult = asyncUnlock();
         try {
-            Await.result(unlockResult, Duration.fromMilliseconds(lockOpTimeout));
+            FutureUtils.result(unlockResult, lockOpTimeout, TimeUnit.MILLISECONDS);
         } catch (TimeoutException toe) {
             // This shouldn't happen unless we lose a watch, and may result in a leaked lock.
             LOG.error("Timeout unlocking {} owned by {} : ", new Object[] { lockPath, lockId, toe });
@@ -921,13 +900,13 @@ class ZKSessionLock implements SessionLock {
                     new Object[] { lockPath, System.currentTimeMillis(),
                             lockEpoch, ZKSessionLock.this.epoch.get() });
         }
-        acquireFuture.updateIfEmpty(new Return<Boolean>(true));
+        acquireFuture.complete(true);
     }
 
     /**
      * NOTE: unlockInternal should only after try lock.
      */
-    private void unlockInternal(final Promise<BoxedUnit> promise) {
+    private void unlockInternal(final CompletableFuture<Void> promise) {
 
         // already closed or expired, nothing to cleanup
         this.epoch.incrementAndGet();
@@ -936,7 +915,7 @@ class ZKSessionLock implements SessionLock {
         }
 
         if (lockState.inState(State.CLOSED)) {
-            promise.setValue(BoxedUnit.UNIT);
+            promise.complete(null);
             return;
         }
 
@@ -951,39 +930,34 @@ class ZKSessionLock implements SessionLock {
             // Nothing to cleanup if INIT (never tried) or EXPIRED (ephemeral node
             // auto-removed)
             lockState.transition(State.CLOSED);
-            promise.setValue(BoxedUnit.UNIT);
+            promise.complete(null);
             return;
         }
 
         // In any other state, we should clean up the member node
-        Promise<BoxedUnit> deletePromise = new Promise<BoxedUnit>();
+        CompletableFuture<Void> deletePromise = new CompletableFuture<Void>();
         deleteLockNode(deletePromise);
 
         // Set the state to closed after we've cleaned up
-        deletePromise.addEventListener(new FutureEventListener<BoxedUnit>() {
+        deletePromise.whenCompleteAsync(new FutureEventListener<Void>() {
             @Override
-            public void onSuccess(BoxedUnit complete) {
-                lockStateExecutor.submit(lockPath, new SafeRunnable() {
-                    @Override
-                    public void safeRun() {
-                        lockState.transition(State.CLOSED);
-                        promise.setValue(BoxedUnit.UNIT);
-                    }
-                });
+            public void onSuccess(Void complete) {
+                lockState.transition(State.CLOSED);
+                promise.complete(null);
             }
             @Override
             public void onFailure(Throwable cause) {
                 // Delete failure is quite serious (causes lock leak) and should be
                 // handled better
                 LOG.error("lock node delete failed {} {}", lockId, lockPath);
-                promise.setValue(BoxedUnit.UNIT);
+                promise.complete(null);
             }
-        });
+        }, lockStateExecutor.chooseExecutor(lockPath));
     }
 
-    private void deleteLockNode(final Promise<BoxedUnit> promise) {
+    private void deleteLockNode(final CompletableFuture<Void> promise) {
         if (null == currentNode) {
-            promise.setValue(BoxedUnit.UNIT);
+            promise.complete(null);
             return;
         }
 
@@ -1005,7 +979,7 @@ class ZKSessionLock implements SessionLock {
                         }
 
                         FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockUnlockCleanup);
-                        promise.setValue(BoxedUnit.UNIT);
+                        promise.complete(null);
                     }
                 });
             }
@@ -1041,8 +1015,8 @@ class ZKSessionLock implements SessionLock {
 
                 // if session expired, just notify the waiter. as the lock acquire doesn't succeed.
                 // we don't even need to clean up the lock as the znode will disappear after session expired
-                acquireFuture.updateIfEmpty(new Throw<Boolean>(
-                        new LockSessionExpiredException(lockPath, lockId, lockState.getState())));
+                acquireFuture.completeExceptionally(
+                    new LockSessionExpiredException(lockPath, lockId, lockState.getState()));
 
                 // session expired, ephemeral node is gone.
                 currentNode = null;
@@ -1088,9 +1062,9 @@ class ZKSessionLock implements SessionLock {
         });
     }
 
-    private Future<String> checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher,
+    private CompletableFuture<String> checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher,
                                                            final boolean wait) {
-        final Promise<String> promise = new Promise<String>();
+        final CompletableFuture<String> promise = new CompletableFuture<String>();
         checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise);
         return promise;
     }
@@ -1107,7 +1081,7 @@ class ZKSessionLock implements SessionLock {
      */
     private void checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher,
                                                  final boolean wait,
-                                                 final Promise<String> promise) {
+                                                 final CompletableFuture<String> promise) {
         zk.getChildren(lockPath, false, new AsyncCallback.Children2Callback() {
             @Override
             public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
@@ -1134,22 +1108,22 @@ class ZKSessionLock implements SessionLock {
                                     final boolean wait,
                                     final int getChildrenRc,
                                     final List<String> children,
-                                    final Promise<String> promise) {
+                                    final CompletableFuture<String> promise) {
         executeLockAction(lockWatcher.epoch, new LockAction() {
             @Override
             public void execute() {
                 if (!lockState.inState(State.PREPARED)) { // e.g. lock closed or session expired after prepared
-                    promise.setException(new LockStateChangedException(lockPath, lockId, State.PREPARED, lockState.getState()));
+                    promise.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.PREPARED, lockState.getState()));
                     return;
                 }
 
                 if (KeeperException.Code.OK.intValue() != getChildrenRc) {
-                    promise.setException(KeeperException.create(KeeperException.Code.get(getChildrenRc)));
+                    promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(getChildrenRc)));
                     return;
                 }
                 if (children.isEmpty()) {
                     LOG.error("Error, member list is empty for lock {}.", lockPath);
-                    promise.setException(new UnexpectedException("Empty member list for lock " + lockPath));
+                    promise.completeExceptionally(new UnexpectedException("Empty member list for lock " + lockPath));
                     return;
                 }
 
@@ -1164,10 +1138,10 @@ class ZKSessionLock implements SessionLock {
                 if (memberIndex == 0) {
                     LOG.info("{} acquired the lock {}.", cid, lockPath);
                     claimOwnership(lockWatcher.epoch);
-                    promise.setValue(cid);
+                    promise.complete(cid);
                 } else if (memberIndex > 0) { // we are in the member list but we didn't hold the lock
                     // get ownership of current owner
-                    asyncParseClientID(zk, lockPath, children.get(0)).addEventListener(new FutureEventListener<Pair<String, Long>>() {
+                    asyncParseClientID(zk, lockPath, children.get(0)).whenComplete(new FutureEventListener<Pair<String, Long>>() {
                         @Override
                         public void onSuccess(Pair<String, Long> currentOwner) {
                             watchLockOwner(lockWatcher, wait,
@@ -1179,7 +1153,7 @@ class ZKSessionLock implements SessionLock {
                             executeLockAction(lockWatcher.epoch, new LockAction() {
                                 @Override
                                 public void execute() {
-                                    promise.setException(cause);
+                                    promise.completeExceptionally(cause);
                                 }
 
                                 @Override
@@ -1192,7 +1166,7 @@ class ZKSessionLock implements SessionLock {
                 } else {
                     LOG.error("Member {} doesn't exist in the members list {} for lock {}.",
                             new Object[]{ cid, children, lockPath});
-                    promise.setException(
+                    promise.completeExceptionally(
                             new UnexpectedException("Member " + cid + " doesn't exist in member list " +
                                     children + " for lock " + lockPath));
                 }
@@ -1229,7 +1203,7 @@ class ZKSessionLock implements SessionLock {
                                 final String siblingNode,
                                 final String ownerNode,
                                 final Pair<String, Long> currentOwner,
-                                final Promise<String> promise) {
+                                final CompletableFuture<String> promise) {
         executeLockAction(lockWatcher.epoch, new LockAction() {
             @Override
             public void execute() {
@@ -1270,7 +1244,7 @@ class ZKSessionLock implements SessionLock {
                                 @Override
                                 public void execute() {
                                     if (!lockState.inState(State.PREPARED)) {
-                                        promise.setException(new LockStateChangedException(lockPath, lockId, State.PREPARED, lockState.getState()));
+                                        promise.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.PREPARED, lockState.getState()));
                                         return;
                                     }
 
@@ -1280,17 +1254,17 @@ class ZKSessionLock implements SessionLock {
                                             LOG.info("LockWatcher {} claimed ownership for {} after set watcher on {}.",
                                                     new Object[]{ myNode, lockPath, ownerNode });
                                             claimOwnership(lockWatcher.epoch);
-                                            promise.setValue(currentOwner.getLeft());
+                                            promise.complete(currentOwner.getLeft());
                                         } else {
                                             // watch sibling successfully
                                             lockState.transition(State.WAITING);
-                                            promise.setValue(currentOwner.getLeft());
+                                            promise.complete(currentOwner.getLeft());
                                         }
                                     } else if (KeeperException.Code.NONODE.intValue() == rc) {
                                         // sibling just disappeared, it might be the chance to claim ownership
                                         checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise);
                                     } else {
-                                        promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                                        promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                                     }
                                 }
 
@@ -1305,7 +1279,7 @@ class ZKSessionLock implements SessionLock {
                         }
                     }, null);
                 } else {
-                    promise.setValue(currentOwner.getLeft());
+                    promise.complete(currentOwner.getLeft());
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java
index 3cb25f0..223a3a4 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java
@@ -17,20 +17,17 @@
  */
 package org.apache.distributedlog.lock;
 
-import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import org.apache.bookkeeper.stats.StatsLogger;
-import scala.runtime.BoxedUnit;
-
 import java.io.IOException;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.bookkeeper.stats.StatsLogger;
 
 /**
  * Factory to create zookeeper based locks.
@@ -65,16 +62,14 @@ public class ZKSessionLockFactory implements SessionLockFactory {
     }
 
     @Override
-    public Future<SessionLock> createLock(String lockPath,
-                                          DistributedLockContext context) {
+    public CompletableFuture<SessionLock> createLock(String lockPath,
+                                                     DistributedLockContext context) {
         AtomicInteger numRetries = new AtomicInteger(lockCreationRetries);
         final AtomicReference<Throwable> interruptedException = new AtomicReference<Throwable>(null);
-        Promise<SessionLock> createPromise =
-                new Promise<SessionLock>(new com.twitter.util.Function<Throwable, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable t) {
-                interruptedException.set(t);
-                return BoxedUnit.UNIT;
+        CompletableFuture<SessionLock> createPromise = FutureUtils.createFuture();
+        createPromise.whenComplete((value, cause) -> {
+            if (null != cause && cause instanceof CancellationException) {
+                interruptedException.set(cause);
             }
         });
         createLock(
@@ -91,13 +86,13 @@ public class ZKSessionLockFactory implements SessionLockFactory {
                     final DistributedLockContext context,
                     final AtomicReference<Throwable> interruptedException,
                     final AtomicInteger numRetries,
-                    final Promise<SessionLock> createPromise,
+                    final CompletableFuture<SessionLock> createPromise,
                     final long delayMs) {
         lockStateExecutor.schedule(lockPath, new Runnable() {
             @Override
             public void run() {
                 if (null != interruptedException.get()) {
-                    createPromise.updateIfEmpty(new Throw<SessionLock>(interruptedException.get()));
+                    createPromise.completeExceptionally(interruptedException.get());
                     return;
                 }
                 try {
@@ -109,14 +104,14 @@ public class ZKSessionLockFactory implements SessionLockFactory {
                             lockOpTimeout,
                             lockStatsLogger,
                             context);
-                    createPromise.updateIfEmpty(new Return<SessionLock>(lock));
+                    createPromise.complete(lock);
                 } catch (DLInterruptedException dlie) {
                     // if the creation is interrupted, throw the exception without retrie.
-                    createPromise.updateIfEmpty(new Throw<SessionLock>(dlie));
+                    createPromise.completeExceptionally(dlie);
                     return;
                 } catch (IOException e) {
                     if (numRetries.getAndDecrement() < 0) {
-                        createPromise.updateIfEmpty(new Throw<SessionLock>(e));
+                        createPromise.completeExceptionally(e);
                         return;
                     }
                     createLock(

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryReader.java
index 8440509..1b292e3 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryReader.java
@@ -18,12 +18,11 @@
 package org.apache.distributedlog.logsegment;
 
 import com.google.common.annotations.Beta;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.Entry;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.util.List;
 
 /**
  * An interface class to read the enveloped entry (serialized bytes of
@@ -87,7 +86,7 @@ public interface LogSegmentEntryReader extends AsyncCloseable {
      * @throws {@link org.apache.distributedlog.exceptions.EndOfLogSegmentException} when
      *          read entries beyond the end of a <i>closed</i> log segment.
      */
-    Future<List<Entry.Reader>> readNext(int numEntries);
+    CompletableFuture<List<Entry.Reader>> readNext(int numEntries);
 
     /**
      * Return the last add confirmed entry id (LAC).

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java
index 40be67b..ab2d898 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java
@@ -18,12 +18,11 @@
 package org.apache.distributedlog.logsegment;
 
 import com.google.common.annotations.Beta;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.util.Allocator;
-import org.apache.distributedlog.util.Transaction;
-import com.twitter.util.Future;
 
 import java.io.IOException;
 
@@ -39,7 +38,7 @@ public interface LogSegmentEntryStore {
      * @param segment log segment metadata
      * @return future represent the delete result
      */
-    Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment);
+    CompletableFuture<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment);
 
     /**
      * Create a new log segment allocator for allocating log segment entry writers.
@@ -58,7 +57,7 @@ public interface LogSegmentEntryStore {
      * @param startEntryId the start entry id
      * @return future represent the opened reader
      */
-    Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
+    CompletableFuture<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
                                              long startEntryId);
 
     /**
@@ -68,6 +67,6 @@ public interface LogSegmentEntryStore {
      * @param fence the flag to fence log segment
      * @return future represent the opened random access reader
      */
-    Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(LogSegmentMetadata segment,
+    CompletableFuture<LogSegmentRandomAccessEntryReader> openRandomAccessReader(LogSegmentMetadata segment,
                                                                      boolean fence);
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java
index 660592e..70f0da0 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java
@@ -19,7 +19,7 @@ package org.apache.distributedlog.logsegment;
 
 import com.google.common.annotations.Beta;
 import org.apache.distributedlog.Entry;
-import org.apache.distributedlog.util.Sizable;
+import org.apache.distributedlog.common.util.Sizable;
 import org.apache.bookkeeper.client.AsyncCallback;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentMetadataStore.java
index fdf72b1..a58264c 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentMetadataStore.java
@@ -18,15 +18,15 @@
 package org.apache.distributedlog.logsegment;
 
 import com.google.common.annotations.Beta;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.callback.LogSegmentNamesListener;
 import org.apache.distributedlog.metadata.LogMetadata;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.util.Transaction;
 import org.apache.distributedlog.util.Transaction.OpListener;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
 
 import java.io.Closeable;
 import java.util.List;
@@ -135,7 +135,7 @@ public interface LogSegmentMetadataStore extends Closeable {
      *          path to store log segment metadata
      * @return future of the retrieved log segment metadata
      */
-    Future<LogSegmentMetadata> getLogSegment(String logSegmentPath);
+    CompletableFuture<LogSegmentMetadata> getLogSegment(String logSegmentPath);
 
     /**
      * Retrieve the list of log segments under <code>logSegmentsPath</code> and register a <i>listener</i>
@@ -147,8 +147,8 @@ public interface LogSegmentMetadataStore extends Closeable {
      *          log segment listener on log segment changes
      * @return future of the retrieved list of log segment names
      */
-    Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
-                                                       LogSegmentNamesListener listener);
+    CompletableFuture<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
+                                                                  LogSegmentNamesListener listener);
 
     /**
      * Unregister a log segment <code>listener</code> on log segment changes under <code>logSegmentsPath</code>.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java
index 948ce30..23c784e 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java
@@ -17,11 +17,10 @@
  */
 package org.apache.distributedlog.logsegment;
 
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.Entry;
 import org.apache.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.util.List;
 
 /**
  * An interface class to read entries {@link org.apache.distributedlog.Entry}
@@ -36,7 +35,7 @@ public interface LogSegmentRandomAccessEntryReader extends AsyncCloseable {
      * @param endEntryId end entry id
      * @return A promise that when satisfied will contain a list of entries of [startEntryId, endEntryId].
      */
-    Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId);
+    CompletableFuture<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId);
 
     /**
      * Return the last add confirmed entry id (LAC).

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentWriter.java
index 39c94f4..c483403 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentWriter.java
@@ -18,15 +18,14 @@
 package org.apache.distributedlog.logsegment;
 
 import com.google.common.annotations.Beta;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.LogRecord;
 import org.apache.distributedlog.exceptions.BKTransmitException;
 import org.apache.distributedlog.exceptions.LockingException;
 import org.apache.distributedlog.io.AsyncAbortable;
 import org.apache.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.io.IOException;
 
 /**
  * An interface class to write log records into a log segment.
@@ -53,7 +52,7 @@ public interface LogSegmentWriter extends AsyncCloseable, AsyncAbortable {
      * @throws BKTransmitException if failed to transmit data to bk
      * @throws org.apache.distributedlog.exceptions.WriteException if failed to write to bk
      */
-    public Future<DLSN> asyncWrite(LogRecord record);
+    public CompletableFuture<DLSN> asyncWrite(LogRecord record);
 
     /**
      * This isn't a simple synchronous version of {@code asyncWrite}. It has different semantic.
@@ -74,7 +73,7 @@ public interface LogSegmentWriter extends AsyncCloseable, AsyncAbortable {
      *
      * @return future representing the transmit result with last acknowledged transaction id.
      */
-    public Future<Long> flush();
+    public CompletableFuture<Long> flush();
 
     /**
      * Commit the current acknowledged data. It is the consequent operation of {@link #flush()},
@@ -82,6 +81,6 @@ public interface LogSegmentWriter extends AsyncCloseable, AsyncAbortable {
      *
      * @return future representing the commit result.
      */
-    public Future<Long> commit();
+    public CompletableFuture<Long> commit();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/RollingPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/RollingPolicy.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/RollingPolicy.java
index ce98ac9..4844ad4 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/RollingPolicy.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/RollingPolicy.java
@@ -17,7 +17,7 @@
  */
 package org.apache.distributedlog.logsegment;
 
-import org.apache.distributedlog.util.Sizable;
+import org.apache.distributedlog.common.util.Sizable;
 
 public interface RollingPolicy {
     /**

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/SizeBasedRollingPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/SizeBasedRollingPolicy.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/SizeBasedRollingPolicy.java
index 544f58e..1c3794a 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/SizeBasedRollingPolicy.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/SizeBasedRollingPolicy.java
@@ -17,7 +17,7 @@
  */
 package org.apache.distributedlog.logsegment;
 
-import org.apache.distributedlog.util.Sizable;
+import org.apache.distributedlog.common.util.Sizable;
 
 public class SizeBasedRollingPolicy implements RollingPolicy {
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/TimeBasedRollingPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/TimeBasedRollingPolicy.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/TimeBasedRollingPolicy.java
index 141f139..80c09be 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/TimeBasedRollingPolicy.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/TimeBasedRollingPolicy.java
@@ -17,7 +17,7 @@
  */
 package org.apache.distributedlog.logsegment;
 
-import org.apache.distributedlog.util.Sizable;
+import org.apache.distributedlog.common.util.Sizable;
 import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DLMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DLMetadata.java b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DLMetadata.java
index 7339d55..948f2bf 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DLMetadata.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DLMetadata.java
@@ -18,6 +18,7 @@
 package org.apache.distributedlog.metadata;
 
 import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
 import org.apache.distributedlog.util.Utils;
 import org.apache.distributedlog.ZooKeeperClient;
@@ -144,13 +145,9 @@ public class DLMetadata {
         byte[] data = serialize();
         try {
             Utils.zkCreateFullPathOptimistic(zkc, uri.getPath(), data,
-                    zkc.getDefaultACL(), CreateMode.PERSISTENT);
-        } catch (KeeperException e) {
-            throw new IOException("Fail to write dl metadata " + new String(data, UTF_8)
-                    +  " to uri " + uri, e);
-        } catch (InterruptedException e) {
-            throw new IOException("Interrupted when writing dl metadata " + new String(data, UTF_8)
-                    + " to uri " + uri, e);
+                zkc.getDefaultACL(), CreateMode.PERSISTENT);
+        } catch (KeeperException ke) {
+            throw new ZKException("Encountered zookeeper exception on creating dl metadata", ke);
         } finally {
             zkc.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java
index 6b835b9..fe52804 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java
@@ -17,10 +17,11 @@
  */
 package org.apache.distributedlog.metadata;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.Transaction;
-import com.twitter.util.Future;
 
 public class DryrunLogSegmentMetadataStoreUpdater extends LogSegmentMetadataStoreUpdater {
 
@@ -38,8 +39,8 @@ public class DryrunLogSegmentMetadataStoreUpdater extends LogSegmentMetadataStor
             }
 
             @Override
-            public Future<Void> execute() {
-                return Future.Void();
+            public CompletableFuture<Void> execute() {
+                return FutureUtils.Void();
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java
index f8fd777..8135678 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java
@@ -19,11 +19,10 @@ package org.apache.distributedlog.metadata;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Optional;
-import org.apache.distributedlog.callback.NamespaceListener;
-import com.twitter.util.Future;
-
 import java.net.URI;
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.callback.NamespaceListener;
 
 /**
  * Interface for log metadata store.
@@ -38,7 +37,7 @@ public interface LogMetadataStore {
      *          name of the log
      * @return namespace location that stores this stream.
      */
-    Future<URI> createLog(String logName);
+    CompletableFuture<URI> createLog(String logName);
 
     /**
      * Get the location of the log.
@@ -47,14 +46,14 @@ public interface LogMetadataStore {
      *          name of the log
      * @return namespace location that stores this stream.
      */
-    Future<Optional<URI>> getLogLocation(String logName);
+    CompletableFuture<Optional<URI>> getLogLocation(String logName);
 
     /**
      * Retrieves logs from the namespace.
      *
      * @return iterator of logs of the namespace.
      */
-    Future<Iterator<String>> getLogs();
+    CompletableFuture<Iterator<String>> getLogs();
 
     /**
      * Register a namespace listener on streams changes.


Mime
View raw message