distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [06/31] incubator-distributedlog git commit: DL-117: Stream metadata store
Date Fri, 30 Dec 2016 00:07:20 GMT
DL-117: Stream metadata store

This change is to abstract the zookeeper operations into a stream metadata store, so we can replace zookeeper with other metadata store easily.

So the metadata operations in distributedlog now are managed by 3 classes:

- LogMetadataStore : it is the namespace metadata store : it manages the location (uri) mapping for streams and handle namespace operations.
- LogStreamMetadataStore: it is the stream metadata store : it manages the metadata for a single stream, such as managing read/write lock, retriving/creating stream metadata, deleting metadata and such.
- LogSegmentMetadataStore: it is the segment metadata store : it manages the log segment metadata for individual log segment.

LogMetadataStore and LogSegmentMetadataStore are already there. This change focus on LogStreamMetadataStore

Changed:

* abstract all the zookeeper metadata operation in log handlers to LogStreamMetadataStore
* remove disabling max tx id santify check, as maxTxId update is part of the metadata update transaction

Not changed:

the name of ZKLogMetadataForReader and ZKLogMetadataForWriter are not changed. I will send out a change to rename these two classes as they are not related to zookeeper anymore.


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/b91d49a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/b91d49a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/b91d49a8

Branch: refs/heads/master
Commit: b91d49a89e7c9133eb2807ced2e7dbb8aba72e02
Parents: 9d467a6
Author: Sijie Guo <sijieg@twitter.com>
Authored: Wed Nov 30 17:14:05 2016 -0800
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Thu Dec 29 02:08:33 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKAsyncLogReaderDLSN.java    |   6 +-
 .../distributedlog/BKDistributedLogManager.java | 180 ++----
 .../BKDistributedLogNamespace.java              | 137 ++--
 .../twitter/distributedlog/BKLogHandler.java    |  74 +--
 .../distributedlog/BKLogReadHandler.java        | 126 +---
 .../distributedlog/BKLogWriteHandler.java       | 211 +++----
 .../distributedlog/BKSyncLogReaderDLSN.java     |   1 -
 .../DistributedLogConfiguration.java            |   2 +
 .../distributedlog/MaxLogSegmentSequenceNo.java |  40 +-
 .../com/twitter/distributedlog/MaxTxId.java     |  76 +--
 .../impl/ZKLogSegmentMetadataStore.java         |  63 +-
 .../impl/metadata/ZKLogMetadata.java            |  23 +-
 .../impl/metadata/ZKLogMetadataForWriter.java   | 309 +--------
 .../impl/metadata/ZKLogStreamMetadataStore.java | 630 +++++++++++++++++++
 .../logsegment/LogSegmentMetadataStore.java     |  24 +-
 .../distributedlog/metadata/BKDLConfig.java     |   5 +-
 .../LogSegmentMetadataStoreUpdater.java         |   4 +-
 .../metadata/LogStreamMetadataStore.java        | 116 ++++
 .../tools/DistributedLogTool.java               |   3 +-
 .../util/LimitedPermitManager.java              | 194 ------
 .../twitter/distributedlog/zk/DefaultZKOp.java  |  20 +-
 .../distributedlog/zk/LimitedPermitManager.java | 195 ++++++
 .../distributedlog/zk/ZKVersionedSetOp.java     |  11 +-
 .../com/twitter/distributedlog/DLMTestUtil.java |   5 +-
 .../TestAppendOnlyStreamWriter.java             |   5 +-
 .../TestBKDistributedLogManager.java            |  65 +-
 .../TestBKDistributedLogNamespace.java          |   6 +-
 .../distributedlog/TestDistributedLogBase.java  |   3 +-
 .../distributedlog/TestRollLogSegments.java     |   1 +
 .../distributedlog/bk/TestLedgerAllocator.java  |   4 +-
 .../impl/TestZKLogSegmentMetadataStore.java     |  73 ++-
 .../metadata/TestZKLogMetadataForWriter.java    | 327 ----------
 ...TestZKLogMetadataForWriterUtilFunctions.java | 204 ------
 .../metadata/TestZKLogStreamMetadataStore.java  | 326 ++++++++++
 .../TestZKLogStreamMetadataStoreUtils.java      | 206 ++++++
 .../distributedlog/util/TestPermitManager.java  |   1 +
 36 files changed, 1875 insertions(+), 1801 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index cf792e3..2ca064c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -30,7 +30,6 @@ import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.injector.AsyncRandomFailureInjector;
 import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
@@ -210,7 +209,6 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
 
     BKAsyncLogReaderDLSN(BKDistributedLogManager bkdlm,
                          ScheduledExecutorService executorService,
-                         OrderedScheduler lockStateExecutor,
                          DLSN startDLSN,
                          Optional<String> subscriberId,
                          boolean returnEndOfStreamRecord,
@@ -219,7 +217,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
         this.bkDistributedLogManager = bkdlm;
         this.executorService = executorService;
         this.bkLedgerManager = bkDistributedLogManager.createReadHandler(subscriberId,
-                lockStateExecutor, this, deserializeRecordSet, true);
+                this, deserializeRecordSet, true);
         LOG.debug("Starting async reader at {}", startDLSN);
         this.startDLSN = startDLSN;
         this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
@@ -414,7 +412,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio
         final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
 
         if (!readAheadStarted) {
-            bkLedgerManager.checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() {
+            bkLedgerManager.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() {
                 @Override
                 public void onSuccess(Void value) {
                     try {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index ac37f3a..0a34caa 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -29,25 +29,22 @@ import com.twitter.distributedlog.bk.SimpleLedgerAllocator;
 import com.twitter.distributedlog.callback.LogSegmentListener;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.LogEmptyException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.function.CloseAsyncCloseableFunction;
 import com.twitter.distributedlog.function.GetVersionedValueFunction;
-import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
+import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
 import com.twitter.distributedlog.io.AsyncCloseable;
 import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.lock.NopDistributedLock;
-import com.twitter.distributedlog.lock.SessionLockFactory;
 import com.twitter.distributedlog.lock.ZKDistributedLock;
-import com.twitter.distributedlog.lock.ZKSessionLockFactory;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.stats.BroadCastStatsLogger;
 import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
 import com.twitter.distributedlog.subscription.SubscriptionStateStore;
@@ -75,10 +72,6 @@ import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
@@ -93,7 +86,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -120,13 +112,6 @@ import java.util.concurrent.TimeUnit;
 class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedLogManager {
     static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class);
 
-    static void createLog(DistributedLogConfiguration conf, ZooKeeperClient zkc, URI uri, String streamName)
-            throws IOException, InterruptedException {
-        Future<ZKLogMetadataForWriter> createFuture = ZKLogMetadataForWriter.of(
-                        uri, streamName, conf.getUnpartitionedStreamName(), zkc.get(), zkc.getDefaultACL(), true, true);
-        FutureUtils.result(createFuture);
-    }
-
     static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION =
             new Function<LogRecordWithDLSN, Long>() {
                 @Override
@@ -158,12 +143,10 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
     private final StatsLogger perLogStatsLogger;
     private final AlertStatsLogger alertStatsLogger;
 
-    // lock factory
-    private SessionLockFactory lockFactory = null;
-
-    // log segment metadata stores
-    private final LogSegmentMetadataStore writerMetadataStore;
-    private final LogSegmentMetadataStore readerMetadataStore;
+    // log stream metadata stores
+    private final LogStreamMetadataStore writerMetadataStore;
+    private final LogStreamMetadataStore readerMetadataStore;
+    // log segment metadata cache
     private final LogSegmentMetadataCache logSegmentMetadataCache;
 
     // bookkeeper clients
@@ -183,9 +166,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
     //
     private final LedgerAllocator ledgerAllocator;
     private final PermitLimiter writeLimiter;
-    // Log Segment Rolling Manager to control rolling speed
-    private final PermitManager logSegmentRollingPermitManager;
-    private OrderedScheduler lockStateExecutor = null;
 
     //
     // Reader Related Variables
@@ -237,19 +217,16 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
              readerBKCBuilder,
              null,
              null,
-             null,
              new LogSegmentMetadataCache(conf, Ticker.systemTicker()),
              OrderedScheduler.newBuilder().name("BKDL-" + name).corePoolSize(1).build(),
              null,
              null,
              null,
-             null,
              new ReadAheadExceptionsLogger(statsLogger),
              DistributedLogConstants.UNKNOWN_CLIENT_ID,
              DistributedLogConstants.LOCAL_REGION_ID,
              null,
              writeLimiter,
-             PermitManager.UNLIMITED_PERMIT_MANAGER,
              featureProvider,
              statsLogger,
              NullStatsLogger.INSTANCE);
@@ -268,12 +245,10 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
      * @param zkcForReaderBKC zookeeper builder for bookkeeper shared by readers
      * @param writerBKCBuilder bookkeeper builder for writers
      * @param readerBKCBuilder bookkeeper builder for readers
-     * @param lockFactory distributed lock factory
      * @param writerMetadataStore writer metadata store
      * @param readerMetadataStore reader metadata store
      * @param scheduler ordered scheduled used by readers and writers
      * @param readAheadScheduler readAhead scheduler used by readers
-     * @param lockStateExecutor ordered scheduled used by locks to execute lock actions
      * @param channelFactory client socket channel factory to build bookkeeper clients
      * @param requestTimer request timer to build bookkeeper clients
      * @param readAheadExceptionsLogger stats logger to record readahead exceptions
@@ -297,13 +272,11 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
                             ZooKeeperClient zkcForReaderBKC,
                             BookKeeperClientBuilder writerBKCBuilder,
                             BookKeeperClientBuilder readerBKCBuilder,
-                            SessionLockFactory lockFactory,
-                            LogSegmentMetadataStore writerMetadataStore,
-                            LogSegmentMetadataStore readerMetadataStore,
+                            LogStreamMetadataStore writerMetadataStore,
+                            LogStreamMetadataStore readerMetadataStore,
                             LogSegmentMetadataCache logSegmentMetadataCache,
                             OrderedScheduler scheduler,
                             OrderedScheduler readAheadScheduler,
-                            OrderedScheduler lockStateExecutor,
                             ClientSocketChannelFactory channelFactory,
                             HashedWheelTimer requestTimer,
                             ReadAheadExceptionsLogger readAheadExceptionsLogger,
@@ -311,7 +284,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
                             Integer regionId,
                             LedgerAllocator ledgerAllocator,
                             PermitLimiter writeLimiter,
-                            PermitManager logSegmentRollingPermitManager,
                             FeatureProvider featureProvider,
                             StatsLogger statsLogger,
                             StatsLogger perLogStatsLogger) throws IOException {
@@ -320,8 +292,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         this.conf = conf;
         this.dynConf = dynConf;
         this.scheduler = scheduler;
-        this.lockFactory = lockFactory;
-        this.lockStateExecutor = lockStateExecutor;
         this.readAheadScheduler = null == readAheadScheduler ? scheduler : readAheadScheduler;
         this.statsLogger = statsLogger;
         this.perLogStatsLogger = BroadCastStatsLogger.masterslave(perLogStatsLogger, statsLogger);
@@ -332,15 +302,24 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         this.streamIdentifier = conf.getUnpartitionedStreamName();
         this.ledgerAllocator = ledgerAllocator;
         this.writeLimiter = writeLimiter;
-        this.logSegmentRollingPermitManager = logSegmentRollingPermitManager;
 
         if (null == writerMetadataStore) {
-            this.writerMetadataStore = new ZKLogSegmentMetadataStore(conf, writerZKC, scheduler);
+            this.writerMetadataStore = new ZKLogStreamMetadataStore(
+                    clientId,
+                    conf,
+                    writerZKC,
+                    scheduler,
+                    statsLogger);
         } else {
             this.writerMetadataStore = writerMetadataStore;
         }
         if (null == readerMetadataStore) {
-            this.readerMetadataStore = new ZKLogSegmentMetadataStore(conf, readerZKC, scheduler);
+            this.readerMetadataStore = new ZKLogStreamMetadataStore(
+                    clientId,
+                    conf,
+                    readerZKC,
+                    scheduler,
+                    statsLogger);
         } else {
             this.readerMetadataStore = readerMetadataStore;
         }
@@ -407,26 +386,13 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         this.readAheadExceptionsLogger = readAheadExceptionsLogger;
     }
 
-    synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
-        if (createIfNull && null == lockStateExecutor && ownExecutor) {
-            lockStateExecutor = OrderedScheduler.newBuilder()
-                    .corePoolSize(1).name("BKDL-LockState").build();
-        }
-        return lockStateExecutor;
+    @VisibleForTesting
+    LogStreamMetadataStore getWriterMetadataStore() {
+        return writerMetadataStore;
     }
 
-    private synchronized SessionLockFactory getLockFactory(boolean createIfNull) {
-        if (createIfNull && null == lockFactory) {
-            lockFactory = new ZKSessionLockFactory(
-                    writerZKC,
-                    clientId,
-                    getLockStateExecutor(createIfNull),
-                    conf.getZKNumRetries(),
-                    conf.getLockTimeoutMilliSeconds(),
-                    conf.getZKRetryBackoffStartMillis(),
-                    statsLogger);
-        }
-        return lockFactory;
+    URI getUri() {
+        return uri;
     }
 
     DistributedLogConfiguration getConf() {
@@ -457,12 +423,16 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         return this.featureProvider;
     }
 
-    private synchronized BKLogReadHandler getReadHandlerForListener(boolean create) {
+    private synchronized BKLogReadHandler getReadHandlerAndRegisterListener(
+            boolean create, LogSegmentListener listener) {
         if (null == readHandlerForListener && create) {
             readHandlerForListener = createReadHandler();
-            // start fetch the log segments
+            readHandlerForListener.registerListener(listener);
+            // start fetch the log segments after created the listener
             readHandlerForListener.asyncStartFetchLogSegments();
+            return readHandlerForListener;
         }
+        readHandlerForListener.registerListener(listener);
         return readHandlerForListener;
     }
 
@@ -483,8 +453,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
 
     @Override
     public void registerListener(LogSegmentListener listener) throws IOException {
-        BKLogReadHandler readHandler = getReadHandlerForListener(true);
-        readHandler.registerListener(listener);
+        getReadHandlerAndRegisterListener(true, listener);
     }
 
     @Override
@@ -523,14 +492,12 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
                                                     boolean isHandleForReading) {
         return createReadHandler(
                 subscriberId,
-                getLockStateExecutor(true),
                 null,
                 true, /* deserialize record set */
                 isHandleForReading);
     }
 
     synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId,
-                                                    OrderedScheduler lockExecutor,
                                                     AsyncNotification notification,
                                                     boolean deserializeRecordSet,
                                                     boolean isHandleForReading) {
@@ -540,12 +507,10 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
                 subscriberId,
                 conf,
                 dynConf,
-                readerZKCBuilder,
                 readerBKCBuilder,
                 readerMetadataStore,
                 logSegmentMetadataCache,
                 scheduler,
-                lockExecutor,
                 readAheadScheduler,
                 alertStatsLogger,
                 readAheadExceptionsLogger,
@@ -585,24 +550,15 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
     }
 
     Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean lockHandler) {
-        final ZooKeeper zk;
-        try {
-            zk = writerZKC.get();
-        } catch (InterruptedException e) {
-            LOG.error("Failed to initialize zookeeper client : ", e);
-            return Future.exception(new DLInterruptedException("Failed to initialize zookeeper client", e));
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(FutureUtils.zkException(e, uri.getPath()));
-        }
-
         boolean ownAllocator = null == ledgerAllocator;
 
-        // Fetching Log Metadata
-        Future<ZKLogMetadataForWriter> metadataFuture =
-                ZKLogMetadataForWriter.of(uri, name, streamIdentifier,
-                        zk, writerZKC.getDefaultACL(),
-                        ownAllocator, conf.getCreateStreamIfNotExists() || ownAllocator);
-        return metadataFuture.flatMap(new AbstractFunction1<ZKLogMetadataForWriter, Future<BKLogWriteHandler>>() {
+        // Fetching Log Metadata (create if not exists)
+        return writerMetadataStore.getLog(
+                uri,
+                name,
+                ownAllocator,
+                conf.getCreateStreamIfNotExists() || ownAllocator
+        ).flatMap(new AbstractFunction1<ZKLogMetadataForWriter, Future<BKLogWriteHandler>>() {
             @Override
             public Future<BKLogWriteHandler> apply(ZKLogMetadataForWriter logMetadata) {
                 Promise<BKLogWriteHandler> createPromise = new Promise<BKLogWriteHandler>();
@@ -615,16 +571,10 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
     private void createWriteHandler(ZKLogMetadataForWriter logMetadata,
                                     boolean lockHandler,
                                     final Promise<BKLogWriteHandler> createPromise) {
-        OrderedScheduler lockStateExecutor = getLockStateExecutor(true);
         // Build the locks
         DistributedLock lock;
         if (conf.isWriteLockEnabled()) {
-            lock = new ZKDistributedLock(
-                    lockStateExecutor,
-                    getLockFactory(true),
-                    logMetadata.getLockPath(),
-                    conf.getLockTimeoutMilliSeconds(),
-                    statsLogger);
+            lock = writerMetadataStore.createWriteLock(logMetadata);
         } else {
             lock = NopDistributedLock.INSTANCE;
         }
@@ -641,7 +591,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         final BKLogWriteHandler writeHandler = new BKLogWriteHandler(
                 logMetadata,
                 conf,
-                writerZKCBuilder,
                 writerBKCBuilder,
                 writerMetadataStore,
                 logSegmentMetadataCache,
@@ -656,10 +605,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
                 featureProvider,
                 dynConf,
                 lock);
-        PermitManager manager = getLogSegmentRollingPermitManager();
-        if (manager instanceof Watcher) {
-            writeHandler.register((Watcher) manager);
-        }
         if (lockHandler) {
             writeHandler.lockHandler().addEventListener(new FutureEventListener<DistributedLock>() {
                 @Override
@@ -684,7 +629,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
     }
 
     PermitManager getLogSegmentRollingPermitManager() {
-        return logSegmentRollingPermitManager;
+        return writerMetadataStore.getPermitManager();
     }
 
     <T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> func) {
@@ -692,7 +637,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         return readerFuturePool.apply(new ExceptionalFunction0<BKLogReadHandler>() {
             @Override
             public BKLogReadHandler applyE() throws Throwable {
-                return getReadHandlerForListener(true);
+                return getReadHandlerAndRegisterListener(true, null);
             }
         }).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() {
             @Override
@@ -982,7 +927,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         AsyncLogReader reader = new BKAsyncLogReaderDLSN(
                 this,
                 scheduler,
-                getLockStateExecutor(true),
                 fromDLSN,
                 subscriberId,
                 false,
@@ -1022,7 +966,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         final BKAsyncLogReaderDLSN reader = new BKAsyncLogReaderDLSN(
                 BKDistributedLogManager.this,
                 scheduler,
-                getLockStateExecutor(true),
                 fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN,
                 subscriberId,
                 false,
@@ -1266,33 +1209,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
      */
     @Override
     public void delete() throws IOException {
-        BKLogWriteHandler ledgerHandler = createWriteHandler(true);
-        try {
-            ledgerHandler.deleteLog();
-        } finally {
-            Utils.closeQuietly(ledgerHandler);
-        }
-
-        // Delete the ZK path associated with the log stream
-        String zkPath = getZKPath();
-        // Safety check when we are using the shared zookeeper
-        if (zkPath.toLowerCase().contains("distributedlog")) {
-            try {
-                LOG.info("Delete the path associated with the log {}, ZK Path {}", name, zkPath);
-                ZKUtil.deleteRecursive(writerZKC.get(), zkPath);
-            } catch (InterruptedException ie) {
-                LOG.error("Interrupted while accessing ZK", ie);
-                throw new DLInterruptedException("Error initializing zk", ie);
-            } catch (KeeperException ke) {
-                LOG.error("Error accessing entry in zookeeper", ke);
-                throw new IOException("Error initializing zk", ke);
-            }
-        } else {
-            LOG.warn("Skip deletion of unrecognized ZK Path {}", zkPath);
-        }
+        FutureUtils.result(writerMetadataStore.deleteLog(uri, getStreamName()));
     }
 
-
     /**
      * The DistributedLogManager may archive/purge any logs for transactionId
      * less than or equal to minImageTxId.
@@ -1377,9 +1296,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
                         SchedulerUtils.shutdownScheduler(readAheadScheduler, schedTimeout, TimeUnit.MILLISECONDS);
                         LOG.info("Stopped BKDL ReadAhead Executor Service for {}.", name);
                     }
-
-                    SchedulerUtils.shutdownScheduler(getLockStateExecutor(false), schedTimeout, TimeUnit.MILLISECONDS);
-                    LOG.info("Stopped BKDL Lock State Executor for {}.", name);
                 }
                 if (ownWriterBKC) {
                     writerBKC.close();
@@ -1410,16 +1326,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
         FutureUtils.result(asyncClose());
     }
 
-    public boolean scheduleTask(Runnable task) {
-        try {
-            scheduler.submit(task);
-            return true;
-        } catch (RejectedExecutionException ree) {
-            LOG.error("Task {} is rejected : ", task, ree);
-            return false;
-        }
-    }
-
     private FuturePool buildFuturePool(ExecutorService executorService,
                                        StatsLogger statsLogger) {
         FuturePool futurePool = new ExecutorServiceFuturePool(executorService);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index f8d347a..2c9fe44 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.distributedlog.DistributedLogManagerFactory.ClientSharingOption;
@@ -33,30 +32,24 @@ import com.twitter.distributedlog.bk.LedgerAllocatorUtils;
 import com.twitter.distributedlog.callback.NamespaceListener;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.ZKException;
 import com.twitter.distributedlog.feature.CoreFeatureKeys;
 import com.twitter.distributedlog.impl.ZKLogMetadataStore;
-import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
 import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
-import com.twitter.distributedlog.lock.SessionLockFactory;
-import com.twitter.distributedlog.lock.ZKSessionLockFactory;
+import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.LogMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
 import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.LimitedPermitManager;
 import com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.PermitLimiter;
-import com.twitter.distributedlog.util.PermitManager;
 import com.twitter.distributedlog.util.SchedulerUtils;
 import com.twitter.distributedlog.util.SimplePermitLimiter;
 import com.twitter.distributedlog.util.Utils;
@@ -272,7 +265,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
     private final BKDLConfig bkdlConfig;
     private final OrderedScheduler scheduler;
     private final OrderedScheduler readAheadExecutor;
-    private final OrderedScheduler lockStateExecutor;
     private final ClientSocketChannelFactory channelFactory;
     private final HashedWheelTimer requestTimer;
     // zookeeper clients
@@ -300,16 +292,12 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
     private final LedgerAllocator allocator;
     // access control manager
     private AccessControlManager accessControlManager;
-    // log segment rolling permit manager
-    private final PermitManager logSegmentRollingPermitManager;
     // log metadata store
     private final LogMetadataStore metadataStore;
     // log segment metadata store
     private final LogSegmentMetadataCache logSegmentMetadataCache;
-    private final LogSegmentMetadataStore writerSegmentMetadataStore;
-    private final LogSegmentMetadataStore readerSegmentMetadataStore;
-    // lock factory
-    private final SessionLockFactory lockFactory;
+    private final LogStreamMetadataStore writerStreamMetadataStore;
+    private final LogStreamMetadataStore readerStreamMetadataStore;
 
     // feature provider
     private final FeatureProvider featureProvider;
@@ -371,15 +359,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
             this.readAheadExecutor = this.scheduler;
             LOG.info("Used shared executor for readahead.");
         }
-        StatsLogger lockStateStatsLogger = statsLogger.scope("factory").scope("lock_scheduler");
-        this.lockStateExecutor = OrderedScheduler.newBuilder()
-                .name("DLM-LockState")
-                .corePoolSize(conf.getNumLockStateThreads())
-                .statsLogger(lockStateStatsLogger)
-                .perExecutorStatsLogger(lockStateStatsLogger)
-                .traceTaskExecution(conf.getEnableTaskExecutionStats())
-                .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros())
-                .build();
+
         this.channelFactory = new NioClientSocketChannelFactory(
             Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()),
             Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()),
@@ -427,9 +407,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
         }
         this.readerBKC = this.sharedReaderBKCBuilder.build();
 
-        this.logSegmentRollingPermitManager = new LimitedPermitManager(
-                conf.getLogSegmentRollingConcurrency(), 1, TimeUnit.MINUTES, scheduler);
-
         if (conf.getGlobalOutstandingWriteLimit() < 0) {
             this.writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
         } else {
@@ -458,15 +435,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
         } else {
             allocator = null;
         }
-        // Build the lock factory
-        this.lockFactory = new ZKSessionLockFactory(
-                sharedWriterZKCForDL,
-                clientId,
-                lockStateExecutor,
-                conf.getZKNumRetries(),
-                conf.getLockTimeoutMilliSeconds(),
-                conf.getZKRetryBackoffStartMillis(),
-                statsLogger);
 
         // Stats Loggers
         this.readAheadExceptionsLogger = new ReadAheadExceptionsLogger(statsLogger);
@@ -478,11 +446,22 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
             this.metadataStore = new ZKLogMetadataStore(conf, namespace, sharedReaderZKCForDL, scheduler);
         }
 
-        // create log segment metadata store
-        this.writerSegmentMetadataStore =
-                new ZKLogSegmentMetadataStore(conf, sharedWriterZKCForDL, scheduler);
-        this.readerSegmentMetadataStore =
-                new ZKLogSegmentMetadataStore(conf, sharedReaderZKCForDL, scheduler);
+        // create log stream metadata store
+        this.writerStreamMetadataStore =
+                new ZKLogStreamMetadataStore(
+                        clientId,
+                        conf,
+                        sharedWriterZKCForDL,
+                        scheduler,
+                        statsLogger);
+        this.readerStreamMetadataStore =
+                new ZKLogStreamMetadataStore(
+                        clientId,
+                        conf,
+                        sharedReaderZKCForDL,
+                        scheduler,
+                        statsLogger);
+        // create a log segment metadata cache
         this.logSegmentMetadataCache = new LogSegmentMetadataCache(conf, Ticker.systemTicker());
 
         LOG.info("Constructed BK DistributedLogNamespace : clientId = {}, regionId = {}, federated = {}.",
@@ -499,7 +478,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
         checkState();
         validateName(logName);
         URI uri = FutureUtils.result(metadataStore.createLog(logName));
-        createUnpartitionedStreams(conf, uri, Lists.newArrayList(logName));
+        FutureUtils.result(writerStreamMetadataStore.getLog(uri, logName, true, true));
     }
 
     @Override
@@ -556,7 +535,16 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
         throws IOException, IllegalArgumentException {
         checkState();
         Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName));
-        return uri.isPresent() && checkIfLogExists(conf, uri.get(), logName);
+        if (uri.isPresent()) {
+            try {
+                FutureUtils.result(writerStreamMetadataStore.logExists(uri.get(), logName));
+                return true;
+            } catch (LogNotFoundException lnfe) {
+                return false;
+            }
+        } else {
+            return false;
+        }
     }
 
     @Override
@@ -701,8 +689,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
     }
 
     @VisibleForTesting
-    public LogSegmentMetadataStore getWriterSegmentMetadataStore() {
-        return writerSegmentMetadataStore;
+    public LogStreamMetadataStore getWriterStreamMetadataStore() {
+        return writerStreamMetadataStore;
     }
 
     @VisibleForTesting
@@ -883,10 +871,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
         }
 
         LedgerAllocator dlmLedgerAlloctor = null;
-        PermitManager dlmLogSegmentRollingPermitManager = PermitManager.UNLIMITED_PERMIT_MANAGER;
         if (ClientSharingOption.SharedClients == clientSharingOption) {
             dlmLedgerAlloctor = this.allocator;
-            dlmLogSegmentRollingPermitManager = this.logSegmentRollingPermitManager;
         }
         // if there's a specified perStreamStatsLogger, user it, otherwise use the default one.
         StatsLogger perLogStatsLogger = perStreamStatsLogger.or(this.perLogStatsLogger);
@@ -902,13 +888,11 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
                 readerZKCForBK,                     /* ZKC for BookKeeper for DL Readers */
                 writerBKCBuilder,                   /* BookKeeper Builder for DL Writers */
                 readerBKCBuilder,                   /* BookKeeper Builder for DL Readers */
-                lockFactory,                        /* Lock Factory */
-                writerSegmentMetadataStore,         /* Log Segment Metadata Store for DL Writers */
-                readerSegmentMetadataStore,         /* Log Segment Metadata Store for DL Readers */
+                writerStreamMetadataStore,         /* Log Segment Metadata Store for DL Writers */
+                readerStreamMetadataStore,         /* Log Segment Metadata Store for DL Readers */
                 logSegmentMetadataCache,            /* Log Segment Metadata Cache */
                 scheduler,                          /* DL scheduler */
                 readAheadExecutor,                  /* Read Aheader Executor */
-                lockStateExecutor,                  /* Lock State Executor */
                 channelFactory,                     /* Netty Channel Factory */
                 requestTimer,                       /* Request Timer */
                 readAheadExceptionsLogger,          /* ReadAhead Exceptions Logger */
@@ -916,7 +900,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
                 regionId,                           /* Region Id */
                 dlmLedgerAlloctor,                  /* Ledger Allocator */
                 writeLimiter,                       /* Write Limiter */
-                dlmLogSegmentRollingPermitManager,  /* Log segment rolling limiter */
                 featureProvider.scope("dl"),        /* Feature Provider */
                 statsLogger,                        /* Stats Logger */
                 perLogStatsLogger                   /* Per Log Stats Logger */
@@ -961,25 +944,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
         validateName(nameOfStream);
     }
 
-    private static boolean checkIfLogExists(DistributedLogConfiguration conf, URI uri, String name)
-        throws IOException, IllegalArgumentException {
-        validateInput(conf, uri, name);
-        final String logRootPath = uri.getPath() + String.format("/%s", name);
-        return withZooKeeperClient(new ZooKeeperClientHandler<Boolean>() {
-            @Override
-            public Boolean handle(ZooKeeperClient zkc) throws IOException {
-                // check existence after syncing
-                try {
-                    return null != Utils.sync(zkc, logRootPath).exists(logRootPath, false);
-                } catch (KeeperException e) {
-                    throw new ZKException("Error on checking if log " + logRootPath + " exists", e.code());
-                } catch (InterruptedException e) {
-                    throw new DLInterruptedException("Interrupted on checking if log " + logRootPath + " exists", e);
-                }
-            }
-        }, conf, uri);
-    }
-
     public static Map<String, byte[]> enumerateLogsWithMetadataInNamespace(final DistributedLogConfiguration conf, final URI uri)
         throws IOException, IllegalArgumentException {
         return withZooKeeperClient(new ZooKeeperClientHandler<Map<String, byte[]>>() {
@@ -1025,27 +989,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
         return result;
     }
 
-    private static void createUnpartitionedStreams(
-            final DistributedLogConfiguration conf,
-            final URI uri,
-            final List<String> streamNames)
-        throws IOException, IllegalArgumentException {
-        withZooKeeperClient(new ZooKeeperClientHandler<Void>() {
-            @Override
-            public Void handle(ZooKeeperClient zkc) throws IOException {
-                for (String s : streamNames) {
-                    try {
-                        BKDistributedLogManager.createLog(conf, zkc, uri, s);
-                    } catch (InterruptedException e) {
-                        LOG.error("Interrupted on creating unpartitioned stream {} : ", s, e);
-                        return null;
-                    }
-                }
-                return null;
-            }
-        }, conf, uri);
-    }
-
     private void checkState() throws IOException {
         if (closed.get()) {
             LOG.error("BKDistributedLogNamespace {} is already closed", namespace);
@@ -1079,13 +1022,11 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
             LOG.info("Ledger Allocator stopped.");
         }
 
-        // Unregister gauge to avoid GC spiral
-        this.logSegmentRollingPermitManager.close();
         this.writeLimiter.close();
 
         // Shutdown log segment metadata stores
-        Utils.close(writerSegmentMetadataStore);
-        Utils.close(readerSegmentMetadataStore);
+        Utils.close(writerStreamMetadataStore);
+        Utils.close(readerStreamMetadataStore);
 
         // Shutdown the schedulers
         SchedulerUtils.shutdownScheduler(scheduler, conf.getSchedulerShutdownTimeoutMs(),
@@ -1113,7 +1054,5 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
         LOG.info("Release external resources used by channel factory.");
         requestTimer.stop();
         LOG.info("Stopped request timer");
-        SchedulerUtils.shutdownScheduler(lockStateExecutor, 5000, TimeUnit.MILLISECONDS);
-        LOG.info("Stopped lock state executor");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index 2a6e85b..4f138f2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -20,12 +20,9 @@ package com.twitter.distributedlog;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.LogEmptyException;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
 import com.twitter.distributedlog.io.AsyncAbortable;
 import com.twitter.distributedlog.io.AsyncCloseable;
@@ -33,6 +30,7 @@ import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
 import com.twitter.distributedlog.logsegment.PerStreamLogSegmentCache;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.util.Function;
@@ -45,10 +43,6 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.runtime.AbstractFunction0;
@@ -95,8 +89,8 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable {
 
     protected final ZKLogMetadata logMetadata;
     protected final DistributedLogConfiguration conf;
-    protected final ZooKeeperClient zooKeeperClient;
     protected final BookKeeperClient bookKeeperClient;
+    protected final LogStreamMetadataStore streamMetadataStore;
     protected final LogSegmentMetadataStore metadataStore;
     protected final LogSegmentMetadataCache metadataCache;
     protected final int firstNumEntriesPerReadLastRecordScan;
@@ -112,8 +106,6 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable {
     // Maintain the list of log segments per stream
     protected final PerStreamLogSegmentCache logSegmentCache;
 
-
-
     // trace
     protected final long metadataLatencyWarnThresholdMillis;
 
@@ -130,15 +122,13 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable {
      */
     BKLogHandler(ZKLogMetadata metadata,
                  DistributedLogConfiguration conf,
-                 ZooKeeperClientBuilder zkcBuilder,
                  BookKeeperClientBuilder bkcBuilder,
-                 LogSegmentMetadataStore metadataStore,
+                 LogStreamMetadataStore streamMetadataStore,
                  LogSegmentMetadataCache metadataCache,
                  OrderedScheduler scheduler,
                  StatsLogger statsLogger,
                  AlertStatsLogger alertStatsLogger,
                  String lockClientId) {
-        Preconditions.checkNotNull(zkcBuilder);
         Preconditions.checkNotNull(bkcBuilder);
         this.logMetadata = metadata;
         this.conf = conf;
@@ -148,13 +138,11 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable {
         this.logSegmentCache = new PerStreamLogSegmentCache(
                 metadata.getLogName(),
                 conf.isLogSegmentSequenceNumberValidationEnabled());
-
         firstNumEntriesPerReadLastRecordScan = conf.getFirstNumEntriesPerReadLastRecordScan();
         maxNumEntriesPerReadLastRecordScan = conf.getMaxNumEntriesPerReadLastRecordScan();
-        this.zooKeeperClient = zkcBuilder.build();
-        LOG.debug("Using ZK Path {}", logMetadata.getLogRootPath());
         this.bookKeeperClient = bkcBuilder.build();
-        this.metadataStore = metadataStore;
+        this.streamMetadataStore = streamMetadataStore;
+        this.metadataStore = streamMetadataStore.getLogSegmentMetadataStore();
         this.metadataCache = metadataCache;
         this.lockClientId = lockClientId;
 
@@ -188,7 +176,8 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable {
 
     public Future<LogRecordWithDLSN> asyncGetFirstLogRecord() {
         final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
-        checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() {
+        streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName())
+                .addEventListener(new FutureEventListener<Void>() {
             @Override
             public void onSuccess(Void value) {
                 readLogSegmentsFromStore(
@@ -234,7 +223,8 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable {
 
     public Future<LogRecordWithDLSN> getLastLogRecordAsync(final boolean recover, final boolean includeEndOfStream) {
         final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
-        checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() {
+        streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName())
+                .addEventListener(new FutureEventListener<Void>() {
             @Override
             public void onSuccess(Void value) {
                 readLogSegmentsFromStore(
@@ -381,8 +371,8 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable {
      * @return the count of records present in the range
      */
     public Future<Long> asyncGetLogRecordCount(final DLSN beginDLSN) {
-
-        return checkLogStreamExistsAsync().flatMap(new Function<Void, Future<Long>>() {
+        return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName())
+                .flatMap(new Function<Void, Future<Long>>() {
             public Future<Long> apply(Void done) {
 
                 return readLogSegmentsFromStore(
@@ -417,48 +407,6 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable {
         return sum;
     }
 
-    Future<Void> checkLogStreamExistsAsync() {
-        final Promise<Void> promise = new Promise<Void>();
-        try {
-            final ZooKeeper zk = zooKeeperClient.get();
-            zk.sync(logMetadata.getLogSegmentsPath(), new AsyncCallback.VoidCallback() {
-                @Override
-                public void processResult(int syncRc, String path, Object syncCtx) {
-                    if (KeeperException.Code.NONODE.intValue() == syncRc) {
-                        promise.setException(new LogNotFoundException(
-                                String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
-                        return;
-                    } else if (KeeperException.Code.OK.intValue() != syncRc){
-                        promise.setException(new ZKException("Error on checking log existence for " + getFullyQualifiedName(),
-                                KeeperException.create(KeeperException.Code.get(syncRc))));
-                        return;
-                    }
-                    zk.exists(logMetadata.getLogSegmentsPath(), false, new AsyncCallback.StatCallback() {
-                        @Override
-                        public void processResult(int rc, String path, Object ctx, Stat stat) {
-                            if (KeeperException.Code.OK.intValue() == rc) {
-                                promise.setValue(null);
-                            } else if (KeeperException.Code.NONODE.intValue() == rc) {
-                                promise.setException(new LogNotFoundException(String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
-                            } else {
-                                promise.setException(new ZKException("Error on checking log existence for " + getFullyQualifiedName(),
-                                        KeeperException.create(KeeperException.Code.get(rc))));
-                            }
-                        }
-                    }, null);
-                }
-            }, null);
-
-        } catch (InterruptedException ie) {
-            LOG.error("Interrupted while reading {}", logMetadata.getLogSegmentsPath(), ie);
-            promise.setException(new DLInterruptedException("Interrupted while checking "
-                    + logMetadata.getLogSegmentsPath(), ie));
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        }
-        return promise;
-    }
-
     @Override
     public Future<Void> asyncAbort() {
         return asyncClose();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
index 30a96ff..1963172 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
@@ -31,8 +31,6 @@ import com.twitter.distributedlog.callback.LogSegmentListener;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.LockCancelledException;
 import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
@@ -40,12 +38,9 @@ import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
 import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.lock.DistributedLock;
-import com.twitter.distributedlog.lock.SessionLockFactory;
-import com.twitter.distributedlog.lock.ZKDistributedLock;
-import com.twitter.distributedlog.lock.ZKSessionLockFactory;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.readahead.ReadAheadWorker;
 import com.twitter.distributedlog.stats.BroadCastStatsLogger;
 import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
@@ -53,7 +48,6 @@ import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.ExceptionalFunction0;
 import com.twitter.util.Function;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
@@ -67,14 +61,13 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Function0;
 import scala.runtime.AbstractFunction1;
 import scala.runtime.BoxedUnit;
 
+import javax.annotation.Nullable;
+
 /**
  * Log Handler for Readers.
  * <h3>Metrics</h3>
@@ -111,7 +104,7 @@ import scala.runtime.BoxedUnit;
  * becoming idle.
  * </ul>
  * <h4>Read Lock</h4>
- * All read lock related stats are exposed under scope `read_lock`. See {@link ZKDistributedLock}
+ * All read lock related stats are exposed under scope `read_lock`.
  * for detail stats.
  */
 class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
@@ -126,10 +119,7 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
     protected ReadAheadWorker readAheadWorker = null;
     private final boolean isHandleForReading;
 
-    private final SessionLockFactory lockFactory;
-    private final OrderedScheduler lockStateExecutor;
     private final Optional<String> subscriberId;
-    private final String readLockPath;
     private DistributedLock readLock;
     private Future<Void> lockAcquireFuture;
 
@@ -156,12 +146,10 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
                      Optional<String> subscriberId,
                      DistributedLogConfiguration conf,
                      DynamicDistributedLogConfiguration dynConf,
-                     ZooKeeperClientBuilder zkcBuilder,
                      BookKeeperClientBuilder bkcBuilder,
-                     LogSegmentMetadataStore metadataStore,
+                     LogStreamMetadataStore streamMetadataStore,
                      LogSegmentMetadataCache metadataCache,
                      OrderedScheduler scheduler,
-                     OrderedScheduler lockStateExecutor,
                      OrderedScheduler readAheadExecutor,
                      AlertStatsLogger alertStatsLogger,
                      ReadAheadExceptionsLogger readAheadExceptionsLogger,
@@ -173,9 +161,8 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
                      boolean deserializeRecordSet) {
         super(logMetadata,
                 conf,
-                zkcBuilder,
                 bkcBuilder,
-                metadataStore,
+                streamMetadataStore,
                 metadataCache,
                 scheduler,
                 statsLogger,
@@ -206,23 +193,12 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
                 Ticker.systemTicker());
 
         this.subscriberId = subscriberId;
-        this.readLockPath = logMetadata.getReadLockPath(subscriberId);
-        this.lockStateExecutor = lockStateExecutor;
-        this.lockFactory = new ZKSessionLockFactory(
-                zooKeeperClient,
-                getLockClientId(),
-                lockStateExecutor,
-                conf.getZKNumRetries(),
-                conf.getLockTimeoutMilliSeconds(),
-                conf.getZKRetryBackoffStartMillis(),
-                statsLogger.scope("read_lock"));
-
         this.isHandleForReading = isHandleForReading;
     }
 
     @VisibleForTesting
     String getReadLockPath() {
-        return readLockPath;
+        return logMetadataForReader.getReadLockPath(subscriberId);
     }
 
     <T> void satisfyPromiseAsync(final Promise<T> promise, final Try<T> result) {
@@ -234,38 +210,24 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
         });
     }
 
+    Future<Void> checkLogStreamExists() {
+        return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName());
+    }
+
     /**
      * Elective stream lock--readers are not required to acquire the lock before using the stream.
      */
     synchronized Future<Void> lockStream() {
         if (null == lockAcquireFuture) {
-            final Function0<DistributedLock> lockFunction =  new ExceptionalFunction0<DistributedLock>() {
-                @Override
-                public DistributedLock applyE() throws IOException {
-                    // Unfortunately this has a blocking call which we should not execute on the
-                    // ZK completion thread
-                    BKLogReadHandler.this.readLock = new ZKDistributedLock(
-                            lockStateExecutor,
-                            lockFactory,
-                            readLockPath,
-                            conf.getLockTimeoutMilliSeconds(),
-                            statsLogger.scope("read_lock"));
-
-                    LOG.info("acquiring readlock {} at {}", getLockClientId(), readLockPath);
-                    return BKLogReadHandler.this.readLock;
-                }
-            };
-            lockAcquireFuture = ensureReadLockPathExist().flatMap(new ExceptionalFunction<Void, Future<Void>>() {
-                @Override
-                public Future<Void> applyE(Void in) throws Throwable {
-                    return scheduler.apply(lockFunction).flatMap(new ExceptionalFunction<DistributedLock, Future<Void>>() {
+            lockAcquireFuture = streamMetadataStore.createReadLock(logMetadataForReader, subscriberId)
+                    .flatMap(new ExceptionalFunction<DistributedLock, Future<Void>>() {
                         @Override
-                        public Future<Void> applyE(DistributedLock lock) throws IOException {
+                        public Future<Void> applyE(DistributedLock lock) throws Throwable {
+                            BKLogReadHandler.this.readLock = lock;
+                            LOG.info("acquiring readlock {} at {}", getLockClientId(), getReadLockPath());
                             return acquireLockOnExecutorThread(lock);
                         }
                     });
-                }
-            });
         }
         return lockAcquireFuture;
     }
@@ -292,14 +254,14 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
         acquireFuture.addEventListener(new FutureEventListener<DistributedLock>() {
             @Override
             public void onSuccess(DistributedLock lock) {
-                LOG.info("acquired readlock {} at {}", getLockClientId(), readLockPath);
+                LOG.info("acquired readlock {} at {}", getLockClientId(), getReadLockPath());
                 satisfyPromiseAsync(threadAcquirePromise, new Return<Void>(null));
             }
 
             @Override
             public void onFailure(Throwable cause) {
                 LOG.info("failed to acquire readlock {} at {}",
-                        new Object[]{getLockClientId(), readLockPath, cause});
+                        new Object[]{ getLockClientId(), getReadLockPath(), cause });
                 satisfyPromiseAsync(threadAcquirePromise, new Throw<Void>(cause));
             }
         });
@@ -438,46 +400,6 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
         return handleCache;
     }
 
-    private Future<Void> ensureReadLockPathExist() {
-        final Promise<Void> promise = new Promise<Void>();
-        promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable t) {
-                FutureUtils.setException(promise, new LockCancelledException(readLockPath, "Could not ensure read lock path", t));
-                return null;
-            }
-        });
-        Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
-        Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate,
-                new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT,
-                new org.apache.zookeeper.AsyncCallback.StringCallback() {
-                    @Override
-                    public void processResult(final int rc, final String path, Object ctx, String name) {
-                        scheduler.submit(new Runnable() {
-                            @Override
-                            public void run() {
-                                if (KeeperException.Code.NONODE.intValue() == rc) {
-                                    FutureUtils.setException(promise, new LogNotFoundException(String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
-                                } else if (KeeperException.Code.OK.intValue() == rc) {
-                                    FutureUtils.setValue(promise, null);
-                                    LOG.trace("Created path {}.", path);
-                                } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                                    FutureUtils.setValue(promise, null);
-                                    LOG.trace("Path {} is already existed.", path);
-                                } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
-                                    FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path));
-                                } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
-                                    FutureUtils.setException(promise, new DLInterruptedException(path));
-                                } else {
-                                    FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc)));
-                                }
-                            }
-                        });
-                    }
-                }, null);
-        return promise;
-    }
-
     public Entry.Reader getNextReadAheadEntry() throws IOException {
         return readAheadCache.getNextReadAheadEntry();
     }
@@ -560,12 +482,16 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
     // Listener for log segments
     //
 
-    protected void registerListener(LogSegmentListener listener) {
-        listeners.add(listener);
+    protected void registerListener(@Nullable LogSegmentListener listener) {
+        if (null != listener) {
+            listeners.add(listener);
+        }
     }
 
-    protected void unregisterListener(LogSegmentListener listener) {
-        listeners.remove(listener);
+    protected void unregisterListener(@Nullable LogSegmentListener listener) {
+        if (null != listener) {
+            listeners.remove(listener);
+        }
     }
 
     protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> segments) {


Mime
View raw message