distributedlog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [2/2] incubator-distributedlog git commit: DL-31: Provide flag to disable zk based distributed lock
Date Tue, 23 Aug 2016 01:05:44 GMT
DL-31: Provide flag to disable zk based distributed lock

DL doesn't enforce any leader election. However it still provides a zookeeper ephemeral znode based lock for leader election. It is unnecessary if applications use core library directly already have its own leader election mechanism.

This change is to provide a flag to allow disable the zk based lock.

Author: Sijie Guo <sijieg@twitter.com>

Reviewers: Leigh Stewart <lstewart@apache.org>

Closes #9 from sijie/sijie/flag_to_disable_lock


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/89613fb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/89613fb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/89613fb7

Branch: refs/heads/master
Commit: 89613fb75ad282b0037166205ae68ea07ae76024
Parents: b23291a
Author: Sijie Guo <sijieg@twitter.com>
Authored: Mon Aug 22 18:05:39 2016 -0700
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Mon Aug 22 18:05:39 2016 -0700

----------------------------------------------------------------------
 .../distributedlog/BKDistributedLogManager.java |  25 +-
 .../distributedlog/BKLogReadHandler.java        |   9 +-
 .../distributedlog/BKLogWriteHandler.java       |   4 +-
 .../DistributedLogConfiguration.java            |  26 +
 .../distributedlog/LocalDLMEmulator.java        |  34 +-
 .../distributedlog/lock/DistributedLock.java    | 514 +-----------------
 .../distributedlog/lock/NopDistributedLock.java |  34 ++
 .../distributedlog/lock/SessionLock.java        |   2 +-
 .../distributedlog/lock/ZKDistributedLock.java  | 537 +++++++++++++++++++
 .../com/twitter/distributedlog/DLMTestUtil.java |   1 +
 .../distributedlog/TestAsyncReaderWriter.java   |   1 -
 .../TestBKDistributedLogManager.java            |  24 +
 .../distributedlog/TestBKLogSegmentWriter.java  |  56 +-
 .../distributedlog/TestDistributedLogBase.java  |  12 +-
 .../lock/TestDistributedLock.java               |  86 +--
 .../service/DistributedLogCluster.java          |   9 +-
 16 files changed, 754 insertions(+), 620 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index 9c19381..a5be03c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -36,8 +36,10 @@ import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
 import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.lock.SessionLockFactory;
 import com.twitter.distributedlog.lock.DistributedLock;
+import com.twitter.distributedlog.lock.NopDistributedLock;
+import com.twitter.distributedlog.lock.SessionLockFactory;
+import com.twitter.distributedlog.lock.ZKDistributedLock;
 import com.twitter.distributedlog.lock.ZKSessionLockFactory;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.metadata.BKDLConfig;
@@ -100,9 +102,9 @@ import java.util.concurrent.TimeUnit;
  * scope `writer_future_pool`. See {@link MonitoredFuturePool} for detail stats.
  * <li> `reader_future_pool/*`: metrics about the future pools that used by readers are exposed under
  * scope `reader_future_pool`. See {@link MonitoredFuturePool} for detail stats.
- * <li> `lock/*`: metrics about the locks used by writers. See {@link DistributedLock} for detail
+ * <li> `lock/*`: metrics about the locks used by writers. See {@link ZKDistributedLock} for detail
  * stats.
- * <li> `read_lock/*`: metrics about the locks used by readers. See {@link DistributedLock} for
+ * <li> `read_lock/*`: metrics about the locks used by readers. See {@link ZKDistributedLock} for
  * detail stats.
  * <li> `logsegments/*`: metrics about basic operations on log segments. See {@link BKLogHandler} for details.
  * <li> `segments/*`: metrics about write operations on log segments. See {@link BKLogWriteHandler} for details.
@@ -604,12 +606,17 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
                                     final Promise<BKLogWriteHandler> createPromise) {
         OrderedScheduler lockStateExecutor = getLockStateExecutor(true);
         // Build the locks
-        DistributedLock lock = new DistributedLock(
-                lockStateExecutor,
-                getLockFactory(true),
-                logMetadata.getLockPath(),
-                conf.getLockTimeoutMilliSeconds(),
-                statsLogger);
+        DistributedLock lock;
+        if (conf.isWriteLockEnabled()) {
+            lock = new ZKDistributedLock(
+                    lockStateExecutor,
+                    getLockFactory(true),
+                    logMetadata.getLockPath(),
+                    conf.getLockTimeoutMilliSeconds(),
+                    statsLogger);
+        } else {
+            lock = NopDistributedLock.INSTANCE;
+        }
         // Build the ledger allocator
         LedgerAllocator allocator;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
index 80f1270..0bf6b84 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
@@ -30,7 +30,9 @@ import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
 import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.lock.SessionLockFactory;
+import com.twitter.distributedlog.lock.ZKDistributedLock;
 import com.twitter.distributedlog.lock.ZKSessionLockFactory;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
@@ -39,7 +41,6 @@ import com.twitter.distributedlog.stats.BroadCastStatsLogger;
 import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.ExceptionalFunction;
 import com.twitter.util.ExceptionalFunction0;
@@ -98,7 +99,7 @@ import scala.runtime.BoxedUnit;
  * becoming idle.
  * </ul>
  * <h4>Read Lock</h4>
- * All read lock related stats are exposed under scope `read_lock`. See {@link DistributedLock}
+ * All read lock related stats are exposed under scope `read_lock`. See {@link ZKDistributedLock}
  * for detail stats.
  */
 class BKLogReadHandler extends BKLogHandler {
@@ -216,7 +217,7 @@ class BKLogReadHandler extends BKLogHandler {
                 public DistributedLock applyE() throws IOException {
                     // Unfortunately this has a blocking call which we should not execute on the
                     // ZK completion thread
-                    BKLogReadHandler.this.readLock = new DistributedLock(
+                    BKLogReadHandler.this.readLock = new ZKDistributedLock(
                             lockStateExecutor,
                             lockFactory,
                             readLockPath,
@@ -247,7 +248,7 @@ class BKLogReadHandler extends BKLogHandler {
      * executor service thread.
      */
     Future<Void> acquireLockOnExecutorThread(DistributedLock lock) throws LockingException {
-        final Future<DistributedLock> acquireFuture = lock.asyncAcquire();
+        final Future<? extends DistributedLock> acquireFuture = lock.asyncAcquire();
 
         // The future we return must be satisfied on an executor service thread. If we simply
         // return the future returned by asyncAcquire, user callbacks may end up running in

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
index 630d626..d73c5e2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
@@ -110,7 +110,7 @@ class BKLogWriteHandler extends BKLogHandler {
     protected final int regionId;
     protected volatile boolean closed = false;
     protected final RollingPolicy rollingPolicy;
-    protected Future<DistributedLock> lockFuture = null;
+    protected Future<? extends DistributedLock> lockFuture = null;
     protected final PermitLimiter writeLimiter;
     protected final FeatureProvider featureProvider;
     protected final DynamicDistributedLogConfiguration dynConf;
@@ -337,7 +337,7 @@ class BKLogWriteHandler extends BKLogHandler {
      *
      * @return future represents the lock result
      */
-    Future<DistributedLock> lockHandler() {
+    Future<? extends DistributedLock> lockHandler() {
         if (null != lockFuture) {
             return lockFuture;
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index 0d69f4a..d2af862 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -267,6 +267,8 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
     public static final int BKDL_LOGSEGMENT_ROLLING_CONCURRENCY_DEFAULT = 1;
 
     // Lock Settings
+    public static final String BKDL_WRITE_LOCK_ENABLED = "writeLockEnabled";
+    public static final boolean BKDL_WRITE_LOCK_ENABLED_DEFAULT = true;
     public static final String BKDL_LOCK_TIMEOUT = "lockTimeoutSeconds";
     public static final long BKDL_LOCK_TIMEOUT_DEFAULT = 30;
     public static final String BKDL_LOCK_REACQUIRE_TIMEOUT = "lockReacquireTimeoutSeconds";
@@ -2039,6 +2041,30 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
     //
 
     /**
+     * Is lock enabled when opening a writer to write a stream?
+     * <p> We don't generally require a lock to write a stream to guarantee correctness. The lock
+     * is more on tracking ownerships. The built-in fencing mechanism is used guarantee correctness
+     * during stream owner failover. It is okay to disable lock if your application knows which nodes
+     * have to write which streams.
+     *
+     * @return true if lock is enabled, otherwise false.
+     */
+    public boolean isWriteLockEnabled() {
+        return this.getBoolean(BKDL_WRITE_LOCK_ENABLED, BKDL_WRITE_LOCK_ENABLED_DEFAULT);
+    }
+
+    /**
+     * Enable lock for opening a writer to write a stream?
+     *
+     * @param enabled flag to enable or disable lock for opening a writer to write a stream.
+     * @return distributedlog configuration.
+     */
+    public DistributedLogConfiguration setWriteLockEnabled(boolean enabled) {
+        setProperty(BKDL_WRITE_LOCK_ENABLED, enabled);
+        return this;
+    }
+
+    /**
      * Get lock timeout in milliseconds. The default value is 30.
      *
      * @return lock timeout in milliseconds

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
index aaecdd5..85a370f 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
@@ -34,10 +34,8 @@ import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.net.BindException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -118,9 +116,12 @@ public class LocalDLMEmulator {
                 conf = (ServerConfiguration) DEFAULT_SERVER_CONFIGURATION.clone();
                 conf.setZkTimeout(zkTimeoutSec * 1000);
             }
+            ServerConfiguration newConf = new ServerConfiguration();
+            newConf.loadConf(conf);
+            newConf.setAllowLoopback(true);
 
             return new LocalDLMEmulator(numBookies, shouldStartZK, zkHost, zkPort,
-                initialBookiePort, zkTimeoutSec, conf);
+                initialBookiePort, zkTimeoutSec, newConf);
         }
     }
 
@@ -128,30 +129,6 @@ public class LocalDLMEmulator {
         return new Builder();
     }
 
-    public LocalDLMEmulator(final int numBookies) throws Exception {
-        this(numBookies, true, DEFAULT_ZK_HOST, DEFAULT_ZK_PORT, DEFAULT_BOOKIE_INITIAL_PORT);
-    }
-
-    public LocalDLMEmulator(final int numBookies, final String zkHost, final int zkPort) throws Exception {
-        this(numBookies, false, zkHost, zkPort, DEFAULT_BOOKIE_INITIAL_PORT);
-    }
-
-    public LocalDLMEmulator(final int numBookies, final String zkHost, final int zkPort, final ServerConfiguration serverConf) throws Exception {
-        this(numBookies, false, zkHost, zkPort, DEFAULT_BOOKIE_INITIAL_PORT, DEFAULT_ZK_TIMEOUT_SEC, serverConf);
-    }
-
-    public LocalDLMEmulator(final int numBookies, final int initialBookiePort) throws Exception {
-        this(numBookies, true, DEFAULT_ZK_HOST, DEFAULT_ZK_PORT, initialBookiePort);
-    }
-
-    public LocalDLMEmulator(final int numBookies, final String zkHost, final int zkPort, final int initialBookiePort) throws Exception {
-        this(numBookies, false, zkHost, zkPort, initialBookiePort);
-    }
-
-    private LocalDLMEmulator(final int numBookies, final boolean shouldStartZK, final String zkHost, final int zkPort, final int initialBookiePort) throws Exception {
-        this(numBookies, shouldStartZK, zkHost, zkPort, initialBookiePort, DEFAULT_ZK_TIMEOUT_SEC, new ServerConfiguration());
-    }
-
     private LocalDLMEmulator(final int numBookies, final boolean shouldStartZK, final String zkHost, final int zkPort, final int initialBookiePort, final int zkTimeoutSec, final ServerConfiguration serverConf) throws Exception {
         this.numBookies = numBookies;
         this.zkHost = zkHost;
@@ -162,7 +139,9 @@ public class LocalDLMEmulator {
         this.bkStartupThread = new Thread() {
             public void run() {
                 try {
+                    LOG.info("Starting {} bookies : allowLoopback = {}", numBookies, serverConf.getAllowLoopback());
                     LocalBookKeeper.startLocalBookies(zkHost, zkPort, numBookies, shouldStartZK, initialBookiePort, serverConf);
+                    LOG.info("{} bookies are started.");
                 } catch (InterruptedException e) {
                     // go away quietly
                 } catch (Exception e) {
@@ -205,6 +184,7 @@ public class LocalDLMEmulator {
         ServerConfiguration bookieConf = new ServerConfiguration();
         bookieConf.setZkTimeout(zkTimeoutSec * 1000);
         bookieConf.setBookiePort(0);
+        bookieConf.setAllowLoopback(true);
         File tmpdir = File.createTempFile("bookie" + UUID.randomUUID() + "_",
             "test");
         if (!tmpdir.delete()) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java
index ddff9c4..0369946 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java
@@ -1,519 +1,37 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package com.twitter.distributedlog.lock;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
 import com.twitter.distributedlog.exceptions.LockingException;
-import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.FutureUtils.OrderedFutureEventListener;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Function;
 import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * Distributed lock, using ZooKeeper.
- * <p/>
- * The lock is vulnerable to timing issues. For example, the process could
- * encounter a really long GC cycle between acquiring the lock, and writing to
- * a ledger. This could have timed out the lock, and another process could have
- * acquired the lock and started writing to bookkeeper. Therefore other
- * mechanisms are required to ensure correctness (i.e. Fencing).
- * <p/>
- * The lock is only allowed to acquire once. If the lock is acquired successfully,
- * the caller holds the ownership until it loses the ownership either because of
- * others already acquired the lock when session expired or explicitly close it.
- * <p>
- * The caller could use {@link #checkOwnership()} or {@link #checkOwnershipAndReacquire()}
- * to check if it still holds the lock. If it doesn't hold the lock, the caller should
- * give up the ownership and close the lock.
- * <h3>Metrics</h3>
- * All the lock related stats are exposed under `lock`.
- * <ul>
- * <li>lock/acquire: opstats. latency spent on acquiring a lock.
- * <li>lock/reacquire: opstats. latency spent on re-acquiring a lock.
- * <li>lock/internalTryRetries: counter. the number of retries on re-creating internal locks.
- * </ul>
- * Other internal lock related stats are also exposed under `lock`. See {@link SessionLock}
- * for details.
+ * Interface for distributed locking
  */
-public class DistributedLock implements LockListener, AsyncCloseable {
-
-    static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);
-
-    private final SessionLockFactory lockFactory;
-    private final OrderedScheduler lockStateExecutor;
-    private final String lockPath;
-    private final long lockTimeout;
-    private final DistributedLockContext lockContext = new DistributedLockContext();
-
-    // We have two lock acquire futures:
-    // 1. lock acquire future: for the initial acquire op
-    // 2. lock reacquire future: for reacquire necessary when session expires, lock is closed
-    private Future<DistributedLock> lockAcquireFuture = null;
-    private Future<DistributedLock> lockReacquireFuture = null;
-    // following variable tracking the status of acquire process
-    //   => create (internalLock) => tryLock (tryLockFuture) => waitForAcquire (lockWaiter)
-    private SessionLock internalLock = null;
-    private Future<LockWaiter> tryLockFuture = null;
-    private LockWaiter lockWaiter = null;
-    // exception indicating if the reacquire failed
-    private LockingException lockReacquireException = null;
-    // closeFuture
-    private volatile boolean closed = false;
-    private Future<Void> closeFuture = null;
-
-    // A counter to track how many re-acquires happened during a lock's life cycle.
-    private final AtomicInteger reacquireCount = new AtomicInteger(0);
-    private final StatsLogger lockStatsLogger;
-    private final OpStatsLogger acquireStats;
-    private final OpStatsLogger reacquireStats;
-    private final Counter internalTryRetries;
-
-    public DistributedLock(
-            OrderedScheduler lockStateExecutor,
-            SessionLockFactory lockFactory,
-            String lockPath,
-            long lockTimeout,
-            StatsLogger statsLogger) {
-        this.lockStateExecutor = lockStateExecutor;
-        this.lockPath = lockPath;
-        this.lockTimeout = lockTimeout;
-        this.lockFactory = lockFactory;
-
-        lockStatsLogger = statsLogger.scope("lock");
-        acquireStats = lockStatsLogger.getOpStatsLogger("acquire");
-        reacquireStats = lockStatsLogger.getOpStatsLogger("reacquire");
-        internalTryRetries = lockStatsLogger.getCounter("internalTryRetries");
-    }
-
-    private LockClosedException newLockClosedException() {
-        return new LockClosedException(lockPath, "Lock is already closed");
-    }
-
-    private synchronized void checkLockState() throws LockingException {
-        if (closed) {
-            throw newLockClosedException();
-        }
-        if (null != lockReacquireException) {
-            throw lockReacquireException;
-        }
-    }
-
-    /**
-     * Asynchronously acquire the lock. Technically the try phase of this operation--which adds us to the waiter
-     * list--is executed synchronously, but the lock wait itself doesn't block.
-     */
-    public synchronized Future<DistributedLock> asyncAcquire() {
-        if (null != lockAcquireFuture) {
-            return Future.exception(new UnexpectedException("Someone is already acquiring/acquired lock " + lockPath));
-        }
-        final Promise<DistributedLock> promise =
-                new Promise<DistributedLock>(new Function<Throwable, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable cause) {
-                lockStateExecutor.submit(lockPath, new Runnable() {
-                    @Override
-                    public void run() {
-                        asyncClose();
-                    }
-                });
-                return BoxedUnit.UNIT;
-            }
-        });
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        promise.addEventListener(new FutureEventListener<DistributedLock>() {
-            @Override
-            public void onSuccess(DistributedLock lock) {
-                acquireStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-            }
-            @Override
-            public void onFailure(Throwable cause) {
-                acquireStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-                // release the lock if fail to acquire
-                asyncClose();
-            }
-        });
-        this.lockAcquireFuture = promise;
-        lockStateExecutor.submit(lockPath, new Runnable() {
-            @Override
-            public void run() {
-                doAsyncAcquire(promise, lockTimeout);
-            }
-        });
-        return promise;
-    }
-
-    void doAsyncAcquire(final Promise<DistributedLock> acquirePromise,
-                        final long lockTimeout) {
-        LOG.trace("Async Lock Acquire {}", lockPath);
-        try {
-            checkLockState();
-        } catch (IOException ioe) {
-            FutureUtils.setException(acquirePromise, ioe);
-            return;
-        }
-
-        lockFactory.createLock(lockPath, lockContext).addEventListener(OrderedFutureEventListener.of(
-                new FutureEventListener<SessionLock>() {
-            @Override
-            public void onSuccess(SessionLock lock) {
-                synchronized (DistributedLock.this) {
-                    if (closed) {
-                        LOG.info("Skipping tryLocking lock {} since it is already closed", lockPath);
-                        FutureUtils.setException(acquirePromise, newLockClosedException());
-                        return;
-                    }
-                }
-                synchronized (DistributedLock.this) {
-                    internalLock = lock;
-                    internalLock.setLockListener(DistributedLock.this);
-                }
-                asyncTryLock(lock, acquirePromise, lockTimeout);
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                FutureUtils.setException(acquirePromise, cause);
-            }
-        }, lockStateExecutor, lockPath));
-    }
-
-    void asyncTryLock(SessionLock lock,
-                      final Promise<DistributedLock> acquirePromise,
-                      final long lockTimeout) {
-        if (null != tryLockFuture) {
-            tryLockFuture.cancel();
-        }
-        tryLockFuture = lock.asyncTryLock(lockTimeout, TimeUnit.MILLISECONDS);
-        tryLockFuture.addEventListener(OrderedFutureEventListener.of(
-                new FutureEventListener<LockWaiter>() {
-                    @Override
-                    public void onSuccess(LockWaiter waiter) {
-                        synchronized (DistributedLock.this) {
-                            if (closed) {
-                                LOG.info("Skipping acquiring lock {} since it is already closed", lockPath);
-                                waiter.getAcquireFuture().raise(new LockingException(lockPath, "lock is already closed."));
-                                FutureUtils.setException(acquirePromise, newLockClosedException());
-                                return;
-                            }
-                        }
-                        tryLockFuture = null;
-                        lockWaiter = waiter;
-                        waitForAcquire(waiter, acquirePromise);
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        FutureUtils.setException(acquirePromise, cause);
-                    }
-                }, lockStateExecutor, lockPath));
-    }
-
-    void waitForAcquire(final LockWaiter waiter,
-                        final Promise<DistributedLock> acquirePromise) {
-        waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of(
-                new FutureEventListener<Boolean>() {
-                    @Override
-                    public void onSuccess(Boolean acquired) {
-                        LOG.info("{} acquired lock {}", waiter, lockPath);
-                        if (acquired) {
-                            FutureUtils.setValue(acquirePromise, DistributedLock.this);
-                        } else {
-                            FutureUtils.setException(acquirePromise,
-                                    new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner()));
-                        }
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        FutureUtils.setException(acquirePromise, cause);
-                    }
-                }, lockStateExecutor, lockPath));
-    }
+public interface DistributedLock extends AsyncCloseable {
 
     /**
-     * NOTE: The {@link LockListener#onExpired()} is already executed in lock executor.
+     * Asynchronously acquire the lock.
+     *
+     * @return future represents the acquire result.
      */
-    @Override
-    public void onExpired() {
-        try {
-            reacquireLock(false);
-        } catch (LockingException le) {
-            // should not happen
-            LOG.error("Locking exception on re-acquiring lock {} : ", lockPath, le);
-        }
-    }
+    Future<? extends DistributedLock> asyncAcquire();
 
     /**
-     * Check if hold lock, if it doesn't, then re-acquire the lock.
+     * Check if hold lock. If it doesn't, then re-acquire the lock.
      *
-     * @throws LockingException     if the lock attempt fails
+     * @throws LockingException if the lock attempt fails
+     * @see #checkOwnership()
      */
-    public synchronized void checkOwnershipAndReacquire() throws LockingException {
-        if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) {
-            throw new LockingException(lockPath, "check ownership before acquiring");
-        }
-
-        if (haveLock()) {
-            return;
-        }
-
-        // We may have just lost the lock because of a ZK session timeout
-        // not necessarily because someone else acquired the lock.
-        // In such cases just try to reacquire. If that fails, it will throw
-        reacquireLock(true);
-    }
+    void checkOwnershipAndReacquire() throws LockingException;
 
     /**
-     * Check if lock is held.
-     * If not, error out and do not reacquire. Use this in cases where there are many waiters by default
-     * and reacquire is unlikley to succeed.
+     * Check if the lock is held. If not, error out and do not re-acquire.
+     * Use this in cases where there are many waiters by default and re-acquire
+     * is unlikely to succeed.
      *
-     * @throws LockingException     if the lock attempt fails
+     * @throws LockingException if we lost the ownership
+     * @see #checkOwnershipAndReacquire()
      */
-    public synchronized void checkOwnership() throws LockingException {
-        if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) {
-            throw new LockingException(lockPath, "check ownership before acquiring");
-        }
-        if (!haveLock()) {
-            throw new LockingException(lockPath, "Lost lock ownership");
-        }
-    }
-
-    @VisibleForTesting
-    int getReacquireCount() {
-        return reacquireCount.get();
-    }
-
-    @VisibleForTesting
-    Future<DistributedLock> getLockReacquireFuture() {
-        return lockReacquireFuture;
-    }
-
-    @VisibleForTesting
-    Future<DistributedLock> getLockAcquireFuture() {
-        return lockAcquireFuture;
-    }
-
-    @VisibleForTesting
-    synchronized SessionLock getInternalLock() {
-        return internalLock;
-    }
-
-    @VisibleForTesting
-    LockWaiter getLockWaiter() {
-        return lockWaiter;
-    }
-
-    synchronized boolean haveLock() {
-        return !closed && internalLock != null && internalLock.isLockHeld();
-    }
-
-    void closeWaiter(final LockWaiter waiter,
-                     final Promise<Void> closePromise) {
-        if (null == waiter) {
-            interruptTryLock(tryLockFuture, closePromise);
-        } else {
-            waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of(
-                    new FutureEventListener<Boolean>() {
-                        @Override
-                        public void onSuccess(Boolean value) {
-                            unlockInternalLock(closePromise);
-                        }
-                        @Override
-                        public void onFailure(Throwable cause) {
-                            unlockInternalLock(closePromise);
-                        }
-                    }, lockStateExecutor, lockPath));
-            FutureUtils.cancel(waiter.getAcquireFuture());
-        }
-    }
-
-    void interruptTryLock(final Future<LockWaiter> tryLockFuture,
-                          final Promise<Void> closePromise) {
-        if (null == tryLockFuture) {
-            unlockInternalLock(closePromise);
-        } else {
-            tryLockFuture.addEventListener(OrderedFutureEventListener.of(
-                    new FutureEventListener<LockWaiter>() {
-                        @Override
-                        public void onSuccess(LockWaiter waiter) {
-                            closeWaiter(waiter, closePromise);
-                        }
-                        @Override
-                        public void onFailure(Throwable cause) {
-                            unlockInternalLock(closePromise);
-                        }
-                    }, lockStateExecutor, lockPath));
-            FutureUtils.cancel(tryLockFuture);
-        }
-    }
-
-    synchronized void unlockInternalLock(final Promise<Void> closePromise) {
-        if (internalLock == null) {
-            FutureUtils.setValue(closePromise, null);
-        } else {
-            internalLock.asyncUnlock().ensure(new AbstractFunction0<BoxedUnit>() {
-                @Override
-                public BoxedUnit apply() {
-                    FutureUtils.setValue(closePromise, null);
-                    return BoxedUnit.UNIT;
-                }
-            });
-        }
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        final Promise<Void> closePromise;
-        synchronized (this) {
-            if (closed) {
-                return closeFuture;
-            }
-            closed = true;
-            closeFuture = closePromise = new Promise<Void>();
-        }
-        final Promise<Void> closeWaiterFuture = new Promise<Void>();
-        closeWaiterFuture.addEventListener(OrderedFutureEventListener.of(new FutureEventListener<Void>() {
-            @Override
-            public void onSuccess(Void value) {
-                complete();
-            }
-            @Override
-            public void onFailure(Throwable cause) {
-                complete();
-            }
-
-            private void complete() {
-                FutureUtils.setValue(closePromise, null);
-            }
-        }, lockStateExecutor, lockPath));
-        lockStateExecutor.submit(lockPath, new Runnable() {
-            @Override
-            public void run() {
-                closeWaiter(lockWaiter, closeWaiterFuture);
-            }
-        });
-        return closePromise;
-    }
-
-    void internalReacquireLock(final AtomicInteger numRetries,
-                               final long lockTimeout,
-                               final Promise<DistributedLock> reacquirePromise) {
-        lockStateExecutor.submit(lockPath, new Runnable() {
-            @Override
-            public void run() {
-                doInternalReacquireLock(numRetries, lockTimeout, reacquirePromise);
-            }
-        });
-    }
-
-    void doInternalReacquireLock(final AtomicInteger numRetries,
-                                 final long lockTimeout,
-                                 final Promise<DistributedLock> reacquirePromise) {
-        internalTryRetries.inc();
-        Promise<DistributedLock> tryPromise = new Promise<DistributedLock>();
-        tryPromise.addEventListener(new FutureEventListener<DistributedLock>() {
-            @Override
-            public void onSuccess(DistributedLock lock) {
-                FutureUtils.setValue(reacquirePromise, lock);
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                if (cause instanceof OwnershipAcquireFailedException) {
-                    // the lock has been acquired by others
-                    FutureUtils.setException(reacquirePromise, cause);
-                } else {
-                    if (numRetries.getAndDecrement() > 0 && !closed) {
-                        internalReacquireLock(numRetries, lockTimeout, reacquirePromise);
-                    } else {
-                        FutureUtils.setException(reacquirePromise, cause);
-                    }
-                }
-            }
-        });
-        doAsyncAcquire(tryPromise, 0);
-    }
-
-    private Future<DistributedLock> reacquireLock(boolean throwLockAcquireException) throws LockingException {
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        Promise<DistributedLock> lockPromise;
-        synchronized (this) {
-            if (closed) {
-                throw newLockClosedException();
-            }
-            if (null != lockReacquireException) {
-                if (throwLockAcquireException) {
-                    throw lockReacquireException;
-                } else {
-                    return null;
-                }
-            }
-            if (null != lockReacquireFuture) {
-                return lockReacquireFuture;
-            }
-            LOG.info("reacquiring lock at {}", lockPath);
-            lockReacquireFuture = lockPromise = new Promise<DistributedLock>();
-            lockReacquireFuture.addEventListener(new FutureEventListener<DistributedLock>() {
-                @Override
-                public void onSuccess(DistributedLock lock) {
-                    // if re-acquire successfully, clear the state.
-                    synchronized (DistributedLock.this) {
-                        lockReacquireFuture = null;
-                    }
-                    reacquireStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-                }
-
-                @Override
-                public void onFailure(Throwable cause) {
-                    synchronized (DistributedLock.this) {
-                        if (cause instanceof LockingException) {
-                            lockReacquireException = (LockingException) cause;
-                        } else {
-                            lockReacquireException = new LockingException(lockPath,
-                                    "Exception on re-acquiring lock", cause);
-                        }
-                    }
-                    reacquireStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-                }
-            });
-        }
-        reacquireCount.incrementAndGet();
-        internalReacquireLock(new AtomicInteger(Integer.MAX_VALUE), 0, lockPromise);
-        return lockPromise;
-    }
+    void checkOwnership() throws LockingException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/NopDistributedLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/NopDistributedLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/NopDistributedLock.java
new file mode 100644
index 0000000..75e32de
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/NopDistributedLock.java
@@ -0,0 +1,34 @@
+package com.twitter.distributedlog.lock;
+
+import com.twitter.distributedlog.exceptions.LockingException;
+import com.twitter.util.Future;
+
+/**
+ * An implementation of {@link DistributedLock} which does nothing.
+ */
+public class NopDistributedLock implements DistributedLock {
+
+    public static final DistributedLock INSTANCE = new NopDistributedLock();
+
+    private NopDistributedLock() {}
+
+    @Override
+    public Future<? extends DistributedLock> asyncAcquire() {
+        return Future.value(this);
+    }
+
+    @Override
+    public void checkOwnershipAndReacquire() throws LockingException {
+        // no-op
+    }
+
+    @Override
+    public void checkOwnership() throws LockingException {
+        // no-op
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        return Future.Void();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java
index 7bfc6c1..95cd593 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java
@@ -70,7 +70,7 @@ public interface SessionLock {
      * <p>
      * <i>tryLock</i> here is effectively the combination of following asynchronous calls.
      * <pre>
-     *     DistributedLock lock = ...;
+     *     ZKDistributedLock lock = ...;
      *     Future<LockWaiter> attemptFuture = lock.asyncTryLock(...);
      *
      *     boolean acquired = waiter.waitForAcquireQuietly();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKDistributedLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKDistributedLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKDistributedLock.java
new file mode 100644
index 0000000..7e9f35b
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKDistributedLock.java
@@ -0,0 +1,537 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.lock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.twitter.concurrent.AsyncSemaphore;
+import com.twitter.distributedlog.exceptions.LockingException;
+import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.FutureUtils.OrderedFutureEventListener;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction0;
+import scala.runtime.BoxedUnit;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Distributed lock, using ZooKeeper.
+ * <p/>
+ * The lock is vulnerable to timing issues. For example, the process could
+ * encounter a really long GC cycle between acquiring the lock, and writing to
+ * a ledger. This could have timed out the lock, and another process could have
+ * acquired the lock and started writing to bookkeeper. Therefore other
+ * mechanisms are required to ensure correctness (i.e. Fencing).
+ * <p/>
+ * The lock is only allowed to acquire once. If the lock is acquired successfully,
+ * the caller holds the ownership until it loses the ownership either because of
+ * others already acquired the lock when session expired or explicitly close it.
+ * <p>
+ * The caller could use {@link #checkOwnership()} or {@link #checkOwnershipAndReacquire()}
+ * to check if it still holds the lock. If it doesn't hold the lock, the caller should
+ * give up the ownership and close the lock.
+ * <h3>Metrics</h3>
+ * All the lock related stats are exposed under `lock`.
+ * <ul>
+ * <li>lock/acquire: opstats. latency spent on acquiring a lock.
+ * <li>lock/reacquire: opstats. latency spent on re-acquiring a lock.
+ * <li>lock/internalTryRetries: counter. the number of retries on re-creating internal locks.
+ * </ul>
+ * Other internal lock related stats are also exposed under `lock`. See {@link SessionLock}
+ * for details.
+ */
+public class ZKDistributedLock implements LockListener, DistributedLock {
+
+    static final Logger LOG = LoggerFactory.getLogger(ZKDistributedLock.class);
+
+    private final SessionLockFactory lockFactory;
+    private final OrderedScheduler lockStateExecutor;
+    private final String lockPath;
+    private final long lockTimeout;
+    private final DistributedLockContext lockContext = new DistributedLockContext();
+
+    private final AsyncSemaphore lockSemaphore = new AsyncSemaphore(1);
+    // We have two lock acquire futures:
+    // 1. lock acquire future: for the initial acquire op
+    // 2. lock reacquire future: for reacquire necessary when session expires, lock is closed
+    private Future<ZKDistributedLock> lockAcquireFuture = null;
+    private Future<ZKDistributedLock> lockReacquireFuture = null;
+    // following variable tracking the status of acquire process
+    //   => create (internalLock) => tryLock (tryLockFuture) => waitForAcquire (lockWaiter)
+    private SessionLock internalLock = null;
+    private Future<LockWaiter> tryLockFuture = null;
+    private LockWaiter lockWaiter = null;
+    // exception indicating if the reacquire failed
+    private LockingException lockReacquireException = null;
+    // closeFuture
+    private volatile boolean closed = false;
+    private Future<Void> closeFuture = null;
+
+    // A counter to track how many re-acquires happened during a lock's life cycle.
+    private final AtomicInteger reacquireCount = new AtomicInteger(0);
+    private final StatsLogger lockStatsLogger;
+    private final OpStatsLogger acquireStats;
+    private final OpStatsLogger reacquireStats;
+    private final Counter internalTryRetries;
+
+    public ZKDistributedLock(
+            OrderedScheduler lockStateExecutor,
+            SessionLockFactory lockFactory,
+            String lockPath,
+            long lockTimeout,
+            StatsLogger statsLogger) {
+        this.lockStateExecutor = lockStateExecutor;
+        this.lockPath = lockPath;
+        this.lockTimeout = lockTimeout;
+        this.lockFactory = lockFactory;
+
+        lockStatsLogger = statsLogger.scope("lock");
+        acquireStats = lockStatsLogger.getOpStatsLogger("acquire");
+        reacquireStats = lockStatsLogger.getOpStatsLogger("reacquire");
+        internalTryRetries = lockStatsLogger.getCounter("internalTryRetries");
+    }
+
+    private LockClosedException newLockClosedException() {
+        return new LockClosedException(lockPath, "Lock is already closed");
+    }
+
+    private synchronized void checkLockState() throws LockingException {
+        if (closed) {
+            throw newLockClosedException();
+        }
+        if (null != lockReacquireException) {
+            throw lockReacquireException;
+        }
+    }
+
+    /**
+     * Asynchronously acquire the lock. Technically the try phase of this operation--which adds us to the waiter
+     * list--is executed synchronously, but the lock wait itself doesn't block.
+     */
+    public synchronized Future<ZKDistributedLock> asyncAcquire() {
+        if (null != lockAcquireFuture) {
+            return Future.exception(new UnexpectedException("Someone is already acquiring/acquired lock " + lockPath));
+        }
+        final Promise<ZKDistributedLock> promise =
+                new Promise<ZKDistributedLock>(new Function<Throwable, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(Throwable cause) {
+                lockStateExecutor.submit(lockPath, new Runnable() {
+                    @Override
+                    public void run() {
+                        asyncClose();
+                    }
+                });
+                return BoxedUnit.UNIT;
+            }
+        });
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        promise.addEventListener(new FutureEventListener<ZKDistributedLock>() {
+            @Override
+            public void onSuccess(ZKDistributedLock lock) {
+                acquireStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            }
+            @Override
+            public void onFailure(Throwable cause) {
+                acquireStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                // release the lock if fail to acquire
+                asyncClose();
+            }
+        });
+        this.lockAcquireFuture = promise;
+        lockStateExecutor.submit(lockPath, new Runnable() {
+            @Override
+            public void run() {
+                doAsyncAcquireWithSemaphore(promise, lockTimeout);
+            }
+        });
+        return promise;
+    }
+
+    void doAsyncAcquireWithSemaphore(final Promise<ZKDistributedLock> acquirePromise,
+                                     final long lockTimeout) {
+        lockSemaphore.acquireAndRun(new AbstractFunction0<Future<ZKDistributedLock>>() {
+            @Override
+            public Future<ZKDistributedLock> apply() {
+                doAsyncAcquire(acquirePromise, lockTimeout);
+                return acquirePromise;
+            }
+        });
+    }
+
+    void doAsyncAcquire(final Promise<ZKDistributedLock> acquirePromise,
+                        final long lockTimeout) {
+        LOG.trace("Async Lock Acquire {}", lockPath);
+        try {
+            checkLockState();
+        } catch (IOException ioe) {
+            FutureUtils.setException(acquirePromise, ioe);
+            return;
+        }
+
+        if (haveLock()) {
+            // it already hold the lock
+            FutureUtils.setValue(acquirePromise, this);
+            return;
+        }
+
+        lockFactory.createLock(lockPath, lockContext).addEventListener(OrderedFutureEventListener.of(
+                new FutureEventListener<SessionLock>() {
+            @Override
+            public void onSuccess(SessionLock lock) {
+                synchronized (ZKDistributedLock.this) {
+                    if (closed) {
+                        LOG.info("Skipping tryLocking lock {} since it is already closed", lockPath);
+                        FutureUtils.setException(acquirePromise, newLockClosedException());
+                        return;
+                    }
+                }
+                synchronized (ZKDistributedLock.this) {
+                    internalLock = lock;
+                    internalLock.setLockListener(ZKDistributedLock.this);
+                }
+                asyncTryLock(lock, acquirePromise, lockTimeout);
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                FutureUtils.setException(acquirePromise, cause);
+            }
+        }, lockStateExecutor, lockPath));
+    }
+
+    void asyncTryLock(SessionLock lock,
+                      final Promise<ZKDistributedLock> acquirePromise,
+                      final long lockTimeout) {
+        if (null != tryLockFuture) {
+            tryLockFuture.cancel();
+        }
+        tryLockFuture = lock.asyncTryLock(lockTimeout, TimeUnit.MILLISECONDS);
+        tryLockFuture.addEventListener(OrderedFutureEventListener.of(
+                new FutureEventListener<LockWaiter>() {
+                    @Override
+                    public void onSuccess(LockWaiter waiter) {
+                        synchronized (ZKDistributedLock.this) {
+                            if (closed) {
+                                LOG.info("Skipping acquiring lock {} since it is already closed", lockPath);
+                                waiter.getAcquireFuture().raise(new LockingException(lockPath, "lock is already closed."));
+                                FutureUtils.setException(acquirePromise, newLockClosedException());
+                                return;
+                            }
+                        }
+                        tryLockFuture = null;
+                        lockWaiter = waiter;
+                        waitForAcquire(waiter, acquirePromise);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        FutureUtils.setException(acquirePromise, cause);
+                    }
+                }, lockStateExecutor, lockPath));
+    }
+
+    void waitForAcquire(final LockWaiter waiter,
+                        final Promise<ZKDistributedLock> acquirePromise) {
+        waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of(
+                new FutureEventListener<Boolean>() {
+                    @Override
+                    public void onSuccess(Boolean acquired) {
+                        LOG.info("{} acquired lock {}", waiter, lockPath);
+                        if (acquired) {
+                            FutureUtils.setValue(acquirePromise, ZKDistributedLock.this);
+                        } else {
+                            FutureUtils.setException(acquirePromise,
+                                    new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner()));
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        FutureUtils.setException(acquirePromise, cause);
+                    }
+                }, lockStateExecutor, lockPath));
+    }
+
+    /**
+     * NOTE: The {@link LockListener#onExpired()} is already executed in lock executor.
+     */
+    @Override
+    public void onExpired() {
+        try {
+            reacquireLock(false);
+        } catch (LockingException le) {
+            // should not happen
+            LOG.error("Locking exception on re-acquiring lock {} : ", lockPath, le);
+        }
+    }
+
+    /**
+     * Check if hold lock, if it doesn't, then re-acquire the lock.
+     *
+     * @throws LockingException     if the lock attempt fails
+     */
+    public synchronized void checkOwnershipAndReacquire() throws LockingException {
+        if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) {
+            throw new LockingException(lockPath, "check ownership before acquiring");
+        }
+
+        if (haveLock()) {
+            return;
+        }
+
+        // We may have just lost the lock because of a ZK session timeout
+        // not necessarily because someone else acquired the lock.
+        // In such cases just try to reacquire. If that fails, it will throw
+        reacquireLock(true);
+    }
+
+    /**
+     * Check if lock is held.
+     * If not, error out and do not reacquire. Use this in cases where there are many waiters by default
+     * and reacquire is unlikley to succeed.
+     *
+     * @throws LockingException     if the lock attempt fails
+     */
+    public synchronized void checkOwnership() throws LockingException {
+        if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) {
+            throw new LockingException(lockPath, "check ownership before acquiring");
+        }
+        if (!haveLock()) {
+            throw new LockingException(lockPath, "Lost lock ownership");
+        }
+    }
+
+    @VisibleForTesting
+    int getReacquireCount() {
+        return reacquireCount.get();
+    }
+
+    @VisibleForTesting
+    Future<ZKDistributedLock> getLockReacquireFuture() {
+        return lockReacquireFuture;
+    }
+
+    @VisibleForTesting
+    Future<ZKDistributedLock> getLockAcquireFuture() {
+        return lockAcquireFuture;
+    }
+
+    @VisibleForTesting
+    synchronized SessionLock getInternalLock() {
+        return internalLock;
+    }
+
+    @VisibleForTesting
+    LockWaiter getLockWaiter() {
+        return lockWaiter;
+    }
+
+    synchronized boolean haveLock() {
+        return !closed && internalLock != null && internalLock.isLockHeld();
+    }
+
+    void closeWaiter(final LockWaiter waiter,
+                     final Promise<Void> closePromise) {
+        if (null == waiter) {
+            interruptTryLock(tryLockFuture, closePromise);
+        } else {
+            waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of(
+                    new FutureEventListener<Boolean>() {
+                        @Override
+                        public void onSuccess(Boolean value) {
+                            unlockInternalLock(closePromise);
+                        }
+                        @Override
+                        public void onFailure(Throwable cause) {
+                            unlockInternalLock(closePromise);
+                        }
+                    }, lockStateExecutor, lockPath));
+            FutureUtils.cancel(waiter.getAcquireFuture());
+        }
+    }
+
+    void interruptTryLock(final Future<LockWaiter> tryLockFuture,
+                          final Promise<Void> closePromise) {
+        if (null == tryLockFuture) {
+            unlockInternalLock(closePromise);
+        } else {
+            tryLockFuture.addEventListener(OrderedFutureEventListener.of(
+                    new FutureEventListener<LockWaiter>() {
+                        @Override
+                        public void onSuccess(LockWaiter waiter) {
+                            closeWaiter(waiter, closePromise);
+                        }
+                        @Override
+                        public void onFailure(Throwable cause) {
+                            unlockInternalLock(closePromise);
+                        }
+                    }, lockStateExecutor, lockPath));
+            FutureUtils.cancel(tryLockFuture);
+        }
+    }
+
+    synchronized void unlockInternalLock(final Promise<Void> closePromise) {
+        if (internalLock == null) {
+            FutureUtils.setValue(closePromise, null);
+        } else {
+            internalLock.asyncUnlock().ensure(new AbstractFunction0<BoxedUnit>() {
+                @Override
+                public BoxedUnit apply() {
+                    FutureUtils.setValue(closePromise, null);
+                    return BoxedUnit.UNIT;
+                }
+            });
+        }
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        final Promise<Void> closePromise;
+        synchronized (this) {
+            if (closed) {
+                return closeFuture;
+            }
+            closed = true;
+            closeFuture = closePromise = new Promise<Void>();
+        }
+        final Promise<Void> closeWaiterFuture = new Promise<Void>();
+        closeWaiterFuture.addEventListener(OrderedFutureEventListener.of(new FutureEventListener<Void>() {
+            @Override
+            public void onSuccess(Void value) {
+                complete();
+            }
+            @Override
+            public void onFailure(Throwable cause) {
+                complete();
+            }
+
+            private void complete() {
+                FutureUtils.setValue(closePromise, null);
+            }
+        }, lockStateExecutor, lockPath));
+        lockStateExecutor.submit(lockPath, new Runnable() {
+            @Override
+            public void run() {
+                closeWaiter(lockWaiter, closeWaiterFuture);
+            }
+        });
+        return closePromise;
+    }
+
+    void internalReacquireLock(final AtomicInteger numRetries,
+                               final long lockTimeout,
+                               final Promise<ZKDistributedLock> reacquirePromise) {
+        lockStateExecutor.submit(lockPath, new Runnable() {
+            @Override
+            public void run() {
+                doInternalReacquireLock(numRetries, lockTimeout, reacquirePromise);
+            }
+        });
+    }
+
+    void doInternalReacquireLock(final AtomicInteger numRetries,
+                                 final long lockTimeout,
+                                 final Promise<ZKDistributedLock> reacquirePromise) {
+        internalTryRetries.inc();
+        Promise<ZKDistributedLock> tryPromise = new Promise<ZKDistributedLock>();
+        tryPromise.addEventListener(new FutureEventListener<ZKDistributedLock>() {
+            @Override
+            public void onSuccess(ZKDistributedLock lock) {
+                FutureUtils.setValue(reacquirePromise, lock);
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                if (cause instanceof OwnershipAcquireFailedException) {
+                    // the lock has been acquired by others
+                    FutureUtils.setException(reacquirePromise, cause);
+                } else {
+                    if (numRetries.getAndDecrement() > 0 && !closed) {
+                        internalReacquireLock(numRetries, lockTimeout, reacquirePromise);
+                    } else {
+                        FutureUtils.setException(reacquirePromise, cause);
+                    }
+                }
+            }
+        });
+        doAsyncAcquireWithSemaphore(tryPromise, 0);
+    }
+
+    private Future<ZKDistributedLock> reacquireLock(boolean throwLockAcquireException) throws LockingException {
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        Promise<ZKDistributedLock> lockPromise;
+        synchronized (this) {
+            if (closed) {
+                throw newLockClosedException();
+            }
+            if (null != lockReacquireException) {
+                if (throwLockAcquireException) {
+                    throw lockReacquireException;
+                } else {
+                    return null;
+                }
+            }
+            if (null != lockReacquireFuture) {
+                return lockReacquireFuture;
+            }
+            LOG.info("reacquiring lock at {}", lockPath);
+            lockReacquireFuture = lockPromise = new Promise<ZKDistributedLock>();
+            lockReacquireFuture.addEventListener(new FutureEventListener<ZKDistributedLock>() {
+                @Override
+                public void onSuccess(ZKDistributedLock lock) {
+                    // if re-acquire successfully, clear the state.
+                    synchronized (ZKDistributedLock.this) {
+                        lockReacquireFuture = null;
+                    }
+                    reacquireStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                }
+
+                @Override
+                public void onFailure(Throwable cause) {
+                    synchronized (ZKDistributedLock.this) {
+                        if (cause instanceof LockingException) {
+                            lockReacquireException = (LockingException) cause;
+                        } else {
+                            lockReacquireException = new LockingException(lockPath,
+                                    "Exception on re-acquiring lock", cause);
+                        }
+                    }
+                    reacquireStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                }
+            });
+        }
+        reacquireCount.incrementAndGet();
+        internalReacquireLock(new AtomicInteger(Integer.MAX_VALUE), 0, lockPromise);
+        return lockPromise;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
index 2c13b06..3355c9b 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
@@ -536,6 +536,7 @@ public class DLMTestUtil {
         } catch (org.apache.commons.configuration.ConfigurationException ex) {
             LOG.warn("loading conf failed", ex);
         }
+        conf.setAllowLoopback(true);
         return conf;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 06cf079..787f74f 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -51,7 +51,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.annotations.DistributedLogAnnotations;
 import com.twitter.distributedlog.exceptions.DLIllegalStateException;
 import com.twitter.distributedlog.exceptions.EndOfStreamException;
 import com.twitter.distributedlog.exceptions.IdleReaderException;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
index 0c8ca9a..3fa3e7d 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException;
+import com.twitter.distributedlog.exceptions.BKTransmitException;
 import com.twitter.distributedlog.exceptions.LogEmptyException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.LogReadException;
@@ -35,6 +36,7 @@ import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.Utils;
+import org.apache.bookkeeper.client.BKException;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -322,6 +324,28 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
     }
 
     @Test(timeout = 60000)
+    public void testTwoWritersOnLockDisabled() throws Exception {
+        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+        confLocal.addConfiguration(conf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setWriteLockEnabled(false);
+        String name = "distrlog-two-writers-lock-disabled";
+        DistributedLogManager manager = createNewDLM(confLocal, name);
+        AsyncLogWriter writer1 = FutureUtils.result(manager.openAsyncLogWriter());
+        FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(1L)));
+        AsyncLogWriter writer2 = FutureUtils.result(manager.openAsyncLogWriter());
+        FutureUtils.result(writer2.write(DLMTestUtil.getLogRecordInstance(2L)));
+
+        // write a record to writer 1 again
+        try {
+            FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(3L)));
+            fail("Should fail writing record to writer 1 again as writer 2 took over the ownership");
+        } catch (BKTransmitException bkte) {
+            assertEquals(BKException.Code.LedgerFencedException, bkte.getBKResultCode());
+        }
+    }
+
+    @Test(timeout = 60000)
     public void testSimpleRead() throws Exception {
         String name = "distrlog-simpleread";
         DistributedLogManager dlm = createNewDLM(conf, name);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java
index 0baf9fe..43e55e4 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java
@@ -24,7 +24,7 @@ import com.twitter.distributedlog.exceptions.WriteException;
 import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
 import com.twitter.distributedlog.io.Abortables;
 import com.twitter.distributedlog.lock.SessionLockFactory;
-import com.twitter.distributedlog.lock.DistributedLock;
+import com.twitter.distributedlog.lock.ZKDistributedLock;
 import com.twitter.distributedlog.lock.ZKSessionLockFactory;
 import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.util.ConfUtils;
@@ -128,9 +128,9 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         return confLocal;
     }
 
-    private DistributedLock createLock(String path,
-                                       ZooKeeperClient zkClient,
-                                       boolean acquireLock)
+    private ZKDistributedLock createLock(String path,
+                                         ZooKeeperClient zkClient,
+                                         boolean acquireLock)
             throws Exception {
         try {
             Await.result(Utils.zkAsyncCreateFullPathOptimistic(zkClient, path, new byte[0],
@@ -147,7 +147,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 conf.getZKSessionTimeoutMilliseconds(),
                 NullStatsLogger.INSTANCE
         );
-        DistributedLock lock = new DistributedLock(
+        ZKDistributedLock lock = new ZKDistributedLock(
                 lockStateExecutor,
                 lockFactory,
                 path,
@@ -161,7 +161,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
     }
 
     private void closeWriterAndLock(BKLogSegmentWriter writer,
-                                    DistributedLock lock)
+                                    ZKDistributedLock lock)
             throws IOException {
         try {
             FutureUtils.result(writer.asyncClose());
@@ -171,7 +171,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
     }
 
     private void abortWriterAndLock(BKLogSegmentWriter writer,
-                                    DistributedLock lock)
+                                    ZKDistributedLock lock)
             throws IOException {
         try {
             Abortables.abort(writer, false);
@@ -183,7 +183,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
     private BKLogSegmentWriter createLogSegmentWriter(DistributedLogConfiguration conf,
                                                       long logSegmentSequenceNumber,
                                                       long startTxId,
-                                                      DistributedLock lock) throws Exception {
+                                                      ZKDistributedLock lock) throws Exception {
         LedgerHandle lh = bkc.get().createLedger(3, 2, 2,
                 BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
         return new BKLogSegmentWriter(
@@ -230,12 +230,12 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         confLocal.setImmediateFlushEnabled(false);
         confLocal.setOutputBufferSize(Integer.MAX_VALUE);
         confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
-        DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
+        ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
         BKLogSegmentWriter writer =
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
         // Use another lock to wait for writer releasing lock
-        DistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
-        Future<DistributedLock> lockFuture0 = lock0.asyncAcquire();
+        ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
+        Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
         // add 10 records
         int numRecords = 10;
         List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
@@ -292,12 +292,12 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         confLocal.setImmediateFlushEnabled(false);
         confLocal.setOutputBufferSize(Integer.MAX_VALUE);
         confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
-        DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
+        ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
         BKLogSegmentWriter writer =
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
         // Use another lock to wait for writer releasing lock
-        DistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
-        Future<DistributedLock> lockFuture0 = lock0.asyncAcquire();
+        ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
+        Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
         // add 10 records
         int numRecords = 10;
         List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
@@ -368,12 +368,12 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         confLocal.setImmediateFlushEnabled(false);
         confLocal.setOutputBufferSize(Integer.MAX_VALUE);
         confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
-        DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
+        ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
         BKLogSegmentWriter writer =
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
         // Use another lock to wait for writer releasing lock
-        DistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
-        Future<DistributedLock> lockFuture0 = lock0.asyncAcquire();
+        ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
+        Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
         // add 10 records
         int numRecords = 10;
         List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
@@ -440,12 +440,12 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         confLocal.setImmediateFlushEnabled(false);
         confLocal.setOutputBufferSize(Integer.MAX_VALUE);
         confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
-        DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
+        ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
         BKLogSegmentWriter writer =
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
         // Use another lock to wait for writer releasing lock
-        DistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
-        Future<DistributedLock> lockFuture0 = lock0.asyncAcquire();
+        ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
+        Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
         // add 10 records
         int numRecords = 10;
         List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
@@ -512,12 +512,12 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         confLocal.setImmediateFlushEnabled(false);
         confLocal.setOutputBufferSize(Integer.MAX_VALUE);
         confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
-        DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
+        ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
         BKLogSegmentWriter writer =
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
         // Use another lock to wait for writer releasing lock
-        DistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
-        Future<DistributedLock> lockFuture0 = lock0.asyncAcquire();
+        ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
+        Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
         // add 10 records
         int numRecords = 10;
         List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
@@ -621,7 +621,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         confLocal.setImmediateFlushEnabled(false);
         confLocal.setOutputBufferSize(Integer.MAX_VALUE);
         confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
-        DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
+        ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
         BKLogSegmentWriter writer =
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
         // add 10 records
@@ -685,7 +685,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         confLocal.setOutputBufferSize(Integer.MAX_VALUE);
         confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
         confLocal.setDurableWriteEnabled(false);
-        DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
+        ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
         BKLogSegmentWriter writer =
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
 
@@ -713,7 +713,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         confLocal.setOutputBufferSize(Integer.MAX_VALUE);
         confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
         confLocal.setDurableWriteEnabled(false);
-        DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
+        ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
         BKLogSegmentWriter writer =
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
 
@@ -741,7 +741,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         confLocal.setOutputBufferSize(Integer.MAX_VALUE);
         confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
         confLocal.setDurableWriteEnabled(false);
-        DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
+        ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
         BKLogSegmentWriter writer =
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
 
@@ -780,7 +780,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
         confLocal.setOutputBufferSize(Integer.MAX_VALUE);
         confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
         confLocal.setDurableWriteEnabled(false);
-        DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
+        ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true);
         BKLogSegmentWriter writer =
                 createLogSegmentWriter(confLocal, 0L, -1L, lock);
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
index d348492..74bafb3 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
@@ -37,7 +37,6 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,17 +71,20 @@ public class TestDistributedLogBase {
 
     @BeforeClass
     public static void setupCluster() throws Exception {
-        boolean success = false;
-        int retries = 0;
         File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog");
         tmpDirs.add(zkTmpDir);
         Pair<ZooKeeperServerShim, Integer> serverAndPort = LocalDLMEmulator.runZookeeperOnAnyPort(zkTmpDir);
         zks = serverAndPort.getLeft();
         zkPort = serverAndPort.getRight();
-        bkutil = new LocalDLMEmulator(numBookies, "127.0.0.1", zkPort, DLMTestUtil.loadTestBkConf());
+        bkutil = LocalDLMEmulator.newBuilder()
+                .numBookies(numBookies)
+                .zkHost("127.0.0.1")
+                .zkPort(zkPort)
+                .serverConf(DLMTestUtil.loadTestBkConf())
+                .shouldStartZK(false)
+                .build();
         bkutil.start();
         zkServers = "127.0.0.1:" + zkPort;
-        success = true;
     }
 
     @AfterClass


Mime
View raw message