bookkeeper-distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [24/31] incubator-distributedlog git commit: DL-163: clean up direct zookeeper and bookkeeper usage and use metadata/data store abstraction
Date Fri, 30 Dec 2016 00:07:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
index b0a38cf..c403e26 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
@@ -17,12 +17,12 @@
  */
 package com.twitter.distributedlog;
 
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
 import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
-import com.twitter.distributedlog.logsegment.LogSegmentFilter;
-import com.twitter.distributedlog.metadata.BKDLConfig;
-import com.twitter.distributedlog.metadata.DLMetadata;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
 import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.PermitLimiter;
@@ -35,11 +35,7 @@ import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
-import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.versioning.Version;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,11 +85,6 @@ public class DLMTestUtil {
         return segments;
     }
 
-    static void updateBKDLConfig(URI uri, String zkServers, String ledgersPath, boolean sanityCheckTxnID) throws Exception {
-        BKDLConfig bkdlConfig = new BKDLConfig(zkServers, ledgersPath).setSanityCheckTxnID(sanityCheckTxnID);
-        DLMetadata.create(bkdlConfig).update(uri);
-    }
-
     public static URI createDLMURI(int port, String path) throws Exception {
         return LocalDLMEmulator.createDLMURI("127.0.0.1:" + port, path);
     }
@@ -111,93 +102,18 @@ public class DLMTestUtil {
                                                       URI uri) throws Exception {
         // TODO: Metadata Accessor seems to be a legacy object which only used by kestrel
         //       (we might consider deprecating this)
-        BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder()
+        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
                 .conf(conf).uri(uri).build();
-        return namespace.createMetadataAccessor(name);
-    }
-
-    public static class BKLogPartitionWriteHandlerAndClients {
-        private BKLogWriteHandler writeHandler;
-        private ZooKeeperClient zooKeeperClient;
-        private BookKeeperClient bookKeeperClient;
-
-        public BKLogPartitionWriteHandlerAndClients(BKLogWriteHandler writeHandler, ZooKeeperClient zooKeeperClient, BookKeeperClient bookKeeperClient) {
-            this.writeHandler = writeHandler;
-            this.zooKeeperClient = zooKeeperClient;
-            this.bookKeeperClient = bookKeeperClient;
-        }
-
-        public void close() {
-            bookKeeperClient.close();
-            zooKeeperClient.close();
-            Utils.closeQuietly(writeHandler);
-        }
-
-        public BKLogWriteHandler getWriteHandler() {
-            return writeHandler;
-        }
-    }
-
-    static BKLogPartitionWriteHandlerAndClients createNewBKDLM(DistributedLogConfiguration conf,
-                                                               String logName,
-                                                               int zkPort) throws Exception {
-        URI uri = createDLMURI(zkPort, "/" + logName);
-
-        ZooKeeperClientBuilder zkcBuilder = TestZooKeeperClientBuilder.newBuilder(conf)
-            .name(String.format("dlzk:%s:handler_dedicated", logName))
-            .uri(uri);
-
-        ZooKeeperClient zkClient = zkcBuilder.build();
-
-        try {
-            zkClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        } catch (KeeperException.NodeExistsException nee) {
-            // ignore
-        }
-
-        // resolve uri
-        BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkClient, uri);
-        BKDLConfig.propagateConfiguration(bkdlConfig, conf);
-        BookKeeperClientBuilder bkcBuilder = BookKeeperClientBuilder.newBuilder()
-            .dlConfig(conf)
-            .name(String.format("bk:%s:handler_dedicated", logName))
-            .zkServers(bkdlConfig.getBkZkServersForWriter())
-            .ledgersPath(bkdlConfig.getBkLedgersPath())
-            .statsLogger(NullStatsLogger.INSTANCE);
-
-        BKDistributedLogManager bkdlm = new BKDistributedLogManager(
-                logName,
-                conf,
-                uri,
-                zkcBuilder,
-                zkcBuilder,
-                zkClient,
-                zkClient,
-                bkcBuilder,
-                bkcBuilder,
-                new SettableFeatureProvider("", 0),
-                PermitLimiter.NULL_PERMIT_LIMITER,
-                NullStatsLogger.INSTANCE);
-
-        BKLogWriteHandler writeHandler = bkdlm.createWriteHandler(true);
-        return new BKLogPartitionWriteHandlerAndClients(writeHandler, zkClient, bkcBuilder.build());
+        return namespace.getNamespaceDriver().getMetadataAccessor(name);
     }
 
     public static void fenceStream(DistributedLogConfiguration conf, URI uri, String name) throws Exception {
-        BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(name, conf, uri);
+        DistributedLogManager dlm = createNewDLM(name, conf, uri);
         try {
-            BKLogReadHandler readHandler = dlm.createReadHandler();
-            List<LogSegmentMetadata> ledgerList = FutureUtils.result(
-                    readHandler.readLogSegmentsFromStore(
-                            LogSegmentMetadata.COMPARATOR,
-                            LogSegmentFilter.DEFAULT_FILTER,
-                            null)
-            ).getValue();
-            LogSegmentMetadata lastSegment = ledgerList.get(ledgerList.size() - 1);
-            BookKeeperClient bkc = dlm.getWriterBKC();
-            LedgerHandle lh = bkc.get().openLedger(lastSegment.getLogSegmentId(),
-                    BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
-            lh.close();
+            List<LogSegmentMetadata> logSegmentList = dlm.getLogSegments();
+            LogSegmentMetadata lastSegment = logSegmentList.get(logSegmentList.size() - 1);
+            LogSegmentEntryStore entryStore = dlm.getNamespaceDriver().getLogSegmentEntryStore(NamespaceDriver.Role.READER);
+            Utils.close(FutureUtils.result(entryStore.openRandomAccessReader(lastSegment, true)));
         } finally {
             dlm.close();
         }
@@ -409,6 +325,14 @@ public class DLMTestUtil {
         return txid - startTxid;
     }
 
+    public static ZooKeeperClient getZooKeeperClient(BKDistributedLogManager dlm) {
+        return ((BKNamespaceDriver) dlm.getNamespaceDriver()).getWriterZKC();
+    }
+
+    public static BookKeeperClient getBookKeeperClient(BKDistributedLogManager dlm) {
+        return ((BKNamespaceDriver) dlm.getNamespaceDriver()).getReaderBKC();
+    }
+
     public static void injectLogSegmentWithGivenLogSegmentSeqNo(DistributedLogManager manager, DistributedLogConfiguration conf,
                                                                 long logSegmentSeqNo, long startTxID, boolean writeEntries, long segmentSize,
                                                                 boolean completeLogSegment)
@@ -417,7 +341,7 @@ public class DLMTestUtil {
         BKLogWriteHandler writeHandler = dlm.createWriteHandler(false);
         FutureUtils.result(writeHandler.lockHandler());
         // Start a log segment with a given ledger seq number.
-        BookKeeperClient bkc = dlm.getWriterBKC();
+        BookKeeperClient bkc = getBookKeeperClient(dlm);
         LedgerHandle lh = bkc.get().createLedger(conf.getEnsembleSize(), conf.getWriteQuorumSize(),
                 conf.getAckQuorumSize(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes());
         String inprogressZnodeName = writeHandler.inprogressZNodeName(lh.getId(), startTxID, logSegmentSeqNo);
@@ -429,7 +353,7 @@ public class DLMTestUtil {
                 .setLogSegmentSequenceNo(logSegmentSeqNo)
                 .setEnvelopeEntries(LogSegmentMetadata.supportsEnvelopedEntries(logSegmentMetadataVersion))
                 .build();
-        l.write(dlm.writerZKC);
+        l.write(getZooKeeperClient(dlm));
         writeHandler.maxTxId.update(Version.ANY, startTxID);
         writeHandler.addLogSegmentToCache(inprogressZnodeName, l);
         BKLogSegmentWriter writer = new BKLogSegmentWriter(
@@ -468,7 +392,7 @@ public class DLMTestUtil {
         BKLogWriteHandler writeHandler = dlm.createWriteHandler(false);
         FutureUtils.result(writeHandler.lockHandler());
         // Start a log segment with a given ledger seq number.
-        BookKeeperClient bkc = dlm.getReaderBKC();
+        BookKeeperClient bkc = getBookKeeperClient(dlm);
         LedgerHandle lh = bkc.get().createLedger(conf.getEnsembleSize(), conf.getWriteQuorumSize(),
                 conf.getAckQuorumSize(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes());
         String inprogressZnodeName = writeHandler.inprogressZNodeName(lh.getId(), startTxID, logSegmentSeqNo);
@@ -479,7 +403,7 @@ public class DLMTestUtil {
             .setLogSegmentSequenceNo(logSegmentSeqNo)
             .setInprogress(false)
             .build();
-        l.write(dlm.writerZKC);
+        l.write(getZooKeeperClient(dlm));
         writeHandler.maxTxId.update(Version.ANY, startTxID);
         writeHandler.addLogSegmentToCache(inprogressZnodeName, l);
         BKLogSegmentWriter writer = new BKLogSegmentWriter(

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
index a6cffbb..124ea77 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
@@ -29,9 +29,11 @@ import java.util.concurrent.atomic.AtomicReference;
 import com.twitter.distributedlog.exceptions.LockCancelledException;
 import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
 import com.twitter.distributedlog.lock.LockClosedException;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
 import com.twitter.distributedlog.subscription.SubscriptionsStore;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.Utils;
@@ -85,7 +87,8 @@ public class TestAsyncReaderLock extends TestDistributedLogBase {
         Utils.close(reader1);
 
         // simulate a old stream created without readlock path
-        writer.bkDistributedLogManager.getWriterZKC().get().delete(readLockPath, -1);
+        NamespaceDriver driver = dlm.getNamespaceDriver();
+        ((BKNamespaceDriver) driver).getWriterZKC().get().delete(readLockPath, -1);
         Future<AsyncLogReader> futureReader2 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
         AsyncLogReader reader2 = Await.result(futureReader2);
         record = Await.result(reader2.readNext());
@@ -230,7 +233,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase {
 
         DistributedLogManager dlm1 = createNewDLM(conf, name);
         Future<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
-        Await.result(futureReader1);
+        AsyncLogReader reader1 = Await.result(futureReader1);
 
         BKDistributedLogManager dlm2 = (BKDistributedLogManager) createNewDLM(conf, name);
         Future<AsyncLogReader> futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
@@ -243,6 +246,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase {
         } catch (LockCancelledException ex) {
         }
 
+        Utils.close(reader1);
         dlm0.close();
         dlm1.close();
     }
@@ -250,16 +254,26 @@ public class TestAsyncReaderLock extends TestDistributedLogBase {
     @Test(timeout = 60000)
     public void testReaderLockSessionExpires() throws Exception {
         String name = runtime.getMethodName();
-        DistributedLogManager dlm0 = createNewDLM(conf, name);
+        URI uri = createDLMURI("/" + name);
+        ensureURICreated(uri);
+        DistributedLogNamespace ns0 = DistributedLogNamespaceBuilder.newBuilder()
+                .conf(conf)
+                .uri(uri)
+                .build();
+        DistributedLogManager dlm0 = ns0.openLog(name);
         BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm0.startAsyncLogSegmentNonPartitioned());
         writer.write(DLMTestUtil.getLogRecordInstance(1L));
         writer.write(DLMTestUtil.getLogRecordInstance(2L));
         writer.closeAndComplete();
 
-        DistributedLogManager dlm1 = createNewDLM(conf, name);
+        DistributedLogNamespace ns1 = DistributedLogNamespaceBuilder.newBuilder()
+                .conf(conf)
+                .uri(uri)
+                .build();
+        DistributedLogManager dlm1 = ns1.openLog(name);
         Future<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
         AsyncLogReader reader1 = Await.result(futureReader1);
-        ZooKeeperClientUtils.expireSession(((BKDistributedLogManager)dlm1).getWriterZKC(), zkServers, 1000);
+        ZooKeeperClientUtils.expireSession(((BKNamespaceDriver) ns1.getNamespaceDriver()).getWriterZKC(), zkServers, 1000);
 
         // The result of expireSession is somewhat non-deterministic with this lock.
         // It may fail with LockingException or it may succesfully reacquire, so for
@@ -276,7 +290,9 @@ public class TestAsyncReaderLock extends TestDistributedLogBase {
 
         Utils.close(reader1);
         dlm0.close();
+        ns0.close();
         dlm1.close();
+        ns1.close();
     }
 
     @Test(timeout = 60000)
@@ -511,8 +527,11 @@ public class TestAsyncReaderLock extends TestDistributedLogBase {
         Utils.close(Await.result(futureReader3));
 
         dlm1.close();
+        namespace1.close();
         dlm2.close();
+        namespace2.close();
         dlm3.close();
+        namespace3.close();
 
         executorService.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 41adbb9..46c8523 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -36,6 +36,7 @@ import com.twitter.distributedlog.config.ConcurrentConstConfiguration;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.BKTransmitException;
 import com.twitter.distributedlog.exceptions.LockingException;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
 import com.twitter.distributedlog.io.CompressionCodec;
 import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.Promise;
@@ -1270,8 +1271,9 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         BKLogSegmentWriter logWriter = writer.getCachedLogWriter();
 
+        BKNamespaceDriver driver = (BKNamespaceDriver) dlm.getNamespaceDriver();
         // fence the ledger
-        dlm.getWriterBKC().get().openLedger(logWriter.getLogSegmentId(),
+        driver.getReaderBKC().get().openLedger(logWriter.getLogSegmentId(),
                 BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8));
 
         try {
@@ -1313,8 +1315,9 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         BKLogSegmentWriter logWriter = writer.getCachedLogWriter();
 
+        BKNamespaceDriver driver = (BKNamespaceDriver) dlm.getNamespaceDriver();
         // fence the ledger
-        dlm.getWriterBKC().get().openLedger(logWriter.getLogSegmentId(),
+        driver.getReaderBKC().get().openLedger(logWriter.getLogSegmentId(),
                 BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8));
 
         try {
@@ -1500,6 +1503,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         confLocal.setImmediateFlushEnabled(true);
         confLocal.setReadAheadBatchSize(1);
         confLocal.setReadAheadMaxRecords(1);
+        confLocal.setReadLACLongPollTimeout(49);
         confLocal.setReaderIdleWarnThresholdMillis(100);
         confLocal.setReaderIdleErrorThresholdMillis(20000);
         final DistributedLogManager dlm = createNewDLM(confLocal, name);
@@ -1976,7 +1980,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         List<LogSegmentMetadata> segments = dlm.getLogSegments();
         assertEquals(1, segments.size());
         long ledgerId = segments.get(0).getLogSegmentId();
-        LedgerHandle lh = ((BKDistributedLogNamespace) namespace).getReaderBKC()
+        LedgerHandle lh = ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC()
                 .get().openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8));
         LedgerMetadata metadata = BookKeeperAccessor.getLedgerMetadata(lh);
         assertEquals(DistributedLogConfiguration.BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT, metadata.getEnsembleSize());
@@ -1995,7 +1999,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         segments = dlm.getLogSegments();
         assertEquals(1, segments.size());
         ledgerId = segments.get(0).getLogSegmentId();
-        lh = ((BKDistributedLogNamespace) namespace).getReaderBKC()
+        lh = ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC()
                 .get().openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8));
         metadata = BookKeeperAccessor.getLedgerMetadata(lh);
         assertEquals(DistributedLogConfiguration.BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT - 1, metadata.getEnsembleSize());
@@ -2147,6 +2151,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         confLocal.setImmediateFlushEnabled(false);
         confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
         confLocal.setPeriodicKeepAliveMilliSeconds(0);
+        confLocal.setReadLACLongPollTimeout(9);
         confLocal.setReaderIdleWarnThresholdMillis(20);
         confLocal.setReaderIdleErrorThresholdMillis(40);
 
@@ -2178,6 +2183,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         confLocal.setImmediateFlushEnabled(false);
         confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
         confLocal.setPeriodicKeepAliveMilliSeconds(1000);
+        confLocal.setReadLACLongPollTimeout(999);
         confLocal.setReaderIdleWarnThresholdMillis(2000);
         confLocal.setReaderIdleErrorThresholdMillis(4000);
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
index 96c33e2..f7d587d 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -32,6 +33,7 @@ import com.twitter.distributedlog.exceptions.LogEmptyException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.LogReadException;
 import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
+import com.twitter.distributedlog.io.Abortables;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
@@ -48,14 +50,12 @@ import com.twitter.distributedlog.callback.LogSegmentListener;
 import com.twitter.distributedlog.exceptions.EndOfStreamException;
 import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import com.twitter.distributedlog.exceptions.LogRecordTooLongException;
-import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
 import com.twitter.distributedlog.metadata.LogMetadata;
 import com.twitter.distributedlog.metadata.MetadataUpdater;
 import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.distributedlog.subscription.SubscriptionStateStore;
 import com.twitter.distributedlog.subscription.SubscriptionsStore;
 import com.twitter.util.Await;
 import com.twitter.util.Duration;
@@ -67,6 +67,8 @@ import static org.junit.Assert.assertEquals;
 public class TestBKDistributedLogManager extends TestDistributedLogBase {
     static final Logger LOG = LoggerFactory.getLogger(TestBKDistributedLogManager.class);
 
+    private static final Random RAND = new Random(System.currentTimeMillis());
+
     @Rule
     public TestName testNames = new TestName();
 
@@ -254,20 +256,6 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
     }
 
     @Test(timeout = 60000)
-    public void testTwoWriters() throws Exception {
-        DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlm1 =
-                createNewBKDLM(conf, "distrlog-dualWriter");
-        try {
-             createNewBKDLM(conf, "distrlog-dualWriter");
-            fail("Shouldn't have been able to open the second writer");
-        } catch (OwnershipAcquireFailedException ioe) {
-            assertEquals(ioe.getCurrentOwner(), DistributedLogConstants.UNKNOWN_CLIENT_ID);
-        }
-
-        bkdlm1.close();
-    }
-
-    @Test(timeout = 60000)
     public void testTwoWritersOnLockDisabled() throws Exception {
         DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
         confLocal.addConfiguration(conf);
@@ -468,11 +456,10 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         writer.setReadyToFlush();
         writer.flushAndSync();
         writer.close();
-        dlm.createOrUpdateMetadata(name.getBytes());
-        assertEquals(name, new String(dlm.getMetadata()));
+        dlm.close();
 
         URI uri = createDLMURI("/" + name);
-        BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder()
+        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
                 .conf(conf).uri(uri).build();
         assertTrue(namespace.logExists(name));
         assertFalse(namespace.logExists("non-existent-log"));
@@ -490,9 +477,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         }
         assertEquals(1, logCount);
 
-        for(Map.Entry<String, byte[]> logEntry: namespace.enumerateLogsWithMetadataInNamespace().entrySet()) {
-            assertEquals(name, new String(logEntry.getValue()));
-        }
+        namespace.close();
     }
 
     @Test(timeout = 60000)
@@ -507,28 +492,6 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
     }
 
     @Test(timeout = 60000)
-    @Deprecated
-    public void testSubscriptionStateStore() throws Exception {
-        String name = "distrlog-subscription-state";
-        String subscriberId = "defaultSubscriber";
-        DLSN commitPosition0 = new DLSN(4, 33, 5);
-        DLSN commitPosition1 = new DLSN(4, 34, 5);
-        DLSN commitPosition2 = new DLSN(5, 34, 5);
-
-        DistributedLogManager dlm = createNewDLM(conf, name);
-        SubscriptionStateStore store = dlm.getSubscriptionStateStore(subscriberId);
-        assertEquals(Await.result(store.getLastCommitPosition()), DLSN.NonInclusiveLowerBound);
-        Await.result(store.advanceCommitPosition(commitPosition1));
-        assertEquals(Await.result(store.getLastCommitPosition()), commitPosition1);
-        Await.result(store.advanceCommitPosition(commitPosition0));
-        assertEquals(Await.result(store.getLastCommitPosition()), commitPosition1);
-        Await.result(store.advanceCommitPosition(commitPosition2));
-        assertEquals(Await.result(store.getLastCommitPosition()), commitPosition2);
-        SubscriptionStateStore store1 = dlm.getSubscriptionStateStore(subscriberId);
-        assertEquals(Await.result(store1.getLastCommitPosition()), commitPosition2);
-    }
-
-    @Test(timeout = 60000)
     public void testSubscriptionsStore() throws Exception {
         String name = "distrlog-subscriptions-store";
         String subscriber0 = "subscriber-0";
@@ -732,24 +695,12 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         reader.close();
     }
 
-    @Test(timeout = 60000)
+    @Test(timeout = 60000, expected = LogRecordTooLongException.class)
     public void testMaxLogRecSize() throws Exception {
-        DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients =
-                createNewBKDLM(conf, "distrlog-maxlogRecSize");
-        long txid = 1;
-        BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(1);
-        boolean exceptionEncountered = false;
-        try {
-            LogRecord op = new LogRecord(txid, DLMTestUtil.repeatString(
-                                DLMTestUtil.repeatString("abcdefgh", 256), 512).getBytes());
-            out.write(op);
-        } catch (LogRecordTooLongException exc) {
-            exceptionEncountered = true;
-        } finally {
-            FutureUtils.result(out.asyncClose());
-        }
-        bkdlmAndClients.close();
-        assertTrue(exceptionEncountered);
+        DistributedLogManager dlm = createNewDLM(conf, "distrlog-maxlogRecSize");
+        AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
+        FutureUtils.result(writer.write(new LogRecord(1L, DLMTestUtil.repeatString(
+                                DLMTestUtil.repeatString("abcdefgh", 256), 512).getBytes())));
     }
 
     @Test(timeout = 60000)
@@ -757,25 +708,27 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
         confLocal.loadConf(conf);
         confLocal.setOutputBufferSize(1024 * 1024);
-        DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients =
-                createNewBKDLM(confLocal, "distrlog-transmissionSize");
-        long txid = 1;
-        BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(1);
+        BKDistributedLogManager dlm =
+                createNewDLM(confLocal, "distrlog-transmissionSize");
+        AsyncLogWriter out = FutureUtils.result(dlm.openAsyncLogWriter());
         boolean exceptionEncountered = false;
-        byte[] largePayload = DLMTestUtil.repeatString(DLMTestUtil.repeatString("abcdefgh", 256), 256).getBytes();
+        byte[] largePayload = new byte[(LogRecord.MAX_LOGRECORDSET_SIZE / 2) + 2];
+        RAND.nextBytes(largePayload);
         try {
-            while (txid < 3) {
-                LogRecord op = new LogRecord(txid, largePayload);
-                out.write(op);
-                txid++;
-            }
+            LogRecord op = new LogRecord(1L, largePayload);
+            Future<DLSN> firstWriteFuture = out.write(op);
+            op = new LogRecord(2L, largePayload);
+            // the second write will flush the first one, since we reached the maximum transmission size.
+            out.write(op);
+            FutureUtils.result(firstWriteFuture);
         } catch (LogRecordTooLongException exc) {
             exceptionEncountered = true;
         } finally {
             FutureUtils.result(out.asyncClose());
         }
-        bkdlmAndClients.close();
-        assertTrue(!exceptionEncountered);
+        assertFalse(exceptionEncountered);
+        Abortables.abortQuietly(out);
+        dlm.close();
     }
 
     @Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
index ecc20e0..a8a82fa 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
@@ -34,9 +34,10 @@ import com.twitter.distributedlog.exceptions.AlreadyClosedException;
 import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.impl.BKDLUtils;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.distributedlog.util.DLUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
@@ -112,63 +113,13 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
     }
 
     @Test(timeout = 60000)
-    @SuppressWarnings("deprecation")
-    public void testClientSharingOptions() throws Exception {
-        URI uri = createDLMURI("/clientSharingOptions");
-        BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder()
-                .conf(conf).uri(uri).build();
-
-        {
-            BKDistributedLogManager bkdlm1 = (BKDistributedLogManager)namespace.createDistributedLogManager("perstream1",
-                                        DistributedLogManagerFactory.ClientSharingOption.PerStreamClients);
-
-            BKDistributedLogManager bkdlm2 = (BKDistributedLogManager)namespace.createDistributedLogManager("perstream2",
-                DistributedLogManagerFactory.ClientSharingOption.PerStreamClients);
-
-            assertThat(bkdlm1.getReaderBKC(), not(bkdlm2.getReaderBKC()));
-            assertThat(bkdlm1.getWriterBKC(), not(bkdlm2.getWriterBKC()));
-            assertThat(bkdlm1.getReaderZKC(), not(bkdlm2.getReaderZKC()));
-            assertThat(bkdlm1.getWriterZKC(), not(bkdlm2.getWriterZKC()));
-
-        }
-
-        {
-            BKDistributedLogManager bkdlm1 = (BKDistributedLogManager)namespace.createDistributedLogManager("sharedZK1",
-                DistributedLogManagerFactory.ClientSharingOption.SharedZKClientPerStreamBKClient);
-
-            BKDistributedLogManager bkdlm2 = (BKDistributedLogManager)namespace.createDistributedLogManager("sharedZK2",
-                DistributedLogManagerFactory.ClientSharingOption.SharedZKClientPerStreamBKClient);
-
-            assertThat(bkdlm1.getReaderBKC(), not(bkdlm2.getReaderBKC()));
-            assertThat(bkdlm1.getWriterBKC(), not(bkdlm2.getWriterBKC()));
-            assertEquals(bkdlm1.getReaderZKC(), bkdlm2.getReaderZKC());
-            assertEquals(bkdlm1.getWriterZKC(), bkdlm2.getWriterZKC());
-        }
-
-        {
-            BKDistributedLogManager bkdlm1 = (BKDistributedLogManager)namespace.createDistributedLogManager("sharedBoth1",
-                DistributedLogManagerFactory.ClientSharingOption.SharedClients);
-
-            BKDistributedLogManager bkdlm2 = (BKDistributedLogManager)namespace.createDistributedLogManager("sharedBoth2",
-                DistributedLogManagerFactory.ClientSharingOption.SharedClients);
-
-            assertEquals(bkdlm1.getReaderBKC(), bkdlm2.getReaderBKC());
-            assertEquals(bkdlm1.getWriterBKC(), bkdlm2.getWriterBKC());
-            assertEquals(bkdlm1.getReaderZKC(), bkdlm2.getReaderZKC());
-            assertEquals(bkdlm1.getWriterZKC(), bkdlm2.getWriterZKC());
-        }
-
-    }
-
-
-    @Test(timeout = 60000)
     public void testInvalidStreamName() throws Exception {
-        assertFalse(BKDLUtils.isReservedStreamName("test"));
-        assertTrue(BKDLUtils.isReservedStreamName(".test"));
+        assertFalse(DLUtils.isReservedStreamName("test"));
+        assertTrue(DLUtils.isReservedStreamName(".test"));
 
         URI uri = createDLMURI("/" + runtime.getMethodName());
 
-        BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder()
+        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
                 .conf(conf).uri(uri).build();
 
         try {
@@ -238,11 +189,6 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
         assertTrue(streamSet.contains("test1"));
         assertTrue(streamSet.contains("test_2-3"));
 
-        Map<String, byte[]> streamMetadatas = namespace.enumerateLogsWithMetadataInNamespace();
-        assertEquals(2, streamMetadatas.size());
-        assertTrue(streamMetadatas.containsKey("test1"));
-        assertTrue(streamMetadatas.containsKey("test_2-3"));
-
         namespace.close();
     }
 
@@ -385,7 +331,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
 
     static void validateBadAllocatorConfiguration(DistributedLogConfiguration conf, URI uri) throws Exception {
         try {
-            BKDistributedLogNamespace.validateAndGetFullLedgerAllocatorPoolPath(conf, uri);
+            BKNamespaceDriver.validateAndGetFullLedgerAllocatorPoolPath(conf, uri);
             fail("Should throw exception when bad allocator configuration provided");
         } catch (IOException ioe) {
             // expected
@@ -425,7 +371,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
     @Test(timeout = 60000)
     public void testUseNamespaceAfterCloseShouldFailFast() throws Exception {
         URI uri = createDLMURI("/" + runtime.getMethodName());
-        BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder()
+        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
             .conf(conf)
             .uri(uri)
             .build();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java
index 4b17500..854cb74 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java
@@ -50,27 +50,6 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
     @Rule
     public TestName runtime = new TestName();
 
-    private void prepareLogSegments(String name, int numSegments, int numEntriesPerSegment) throws Exception {
-        DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients = createNewBKDLM(conf, name);
-        long txid = 1;
-        for (int sid = 0; sid < numSegments; ++sid) {
-            BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(txid);
-            for (int eid = 0; eid < numEntriesPerSegment; ++eid) {
-                LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid);
-                out.write(record);
-                ++txid;
-            }
-            FutureUtils.result(out.asyncClose());
-            bkdlmAndClients.getWriteHandler().completeAndCloseLogSegment(
-                    out.getLogSegmentSequenceNumber(),
-                    out.getLogSegmentId(),
-                    1 + sid * numEntriesPerSegment,
-                    (sid + 1) * numEntriesPerSegment,
-                    numEntriesPerSegment);
-        }
-        bkdlmAndClients.close();
-    }
-
     private void prepareLogSegmentsNonPartitioned(String name, int numSegments, int numEntriesPerSegment) throws Exception {
         DistributedLogManager dlm = createNewDLM(conf, name);
         long txid = 1;
@@ -134,8 +113,8 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
     @Test(timeout = 60000)
     public void testGetFirstDLSNWithLogSegments() throws Exception {
         String dlName = runtime.getMethodName();
-        prepareLogSegments(dlName, 3, 3);
         BKDistributedLogManager dlm = createNewDLM(conf, dlName);
+        DLMTestUtil.generateCompletedLogSegments(dlm, conf, 3, 3);
         BKLogReadHandler readHandler = dlm.createReadHandler();
         Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
         try {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java
index b350255..8f86192 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java
@@ -21,14 +21,14 @@ import com.twitter.distributedlog.exceptions.BKTransmitException;
 import com.twitter.distributedlog.exceptions.EndOfStreamException;
 import com.twitter.distributedlog.exceptions.WriteCancelledException;
 import com.twitter.distributedlog.exceptions.WriteException;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
 import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
 import com.twitter.distributedlog.io.Abortables;
 import com.twitter.distributedlog.lock.SessionLockFactory;
 import com.twitter.distributedlog.lock.ZKDistributedLock;
 import com.twitter.distributedlog.lock.ZKSessionLockFactory;
-import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
 import com.twitter.distributedlog.util.ConfUtils;
-import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.PermitLimiter;
@@ -96,7 +96,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
                 .dlConfig(conf)
                 .name("test-bkc")
                 .ledgersPath(bkdlConfig.getBkLedgersPath())
-                .zkServers(DLUtils.getZKServersFromDLUri(uri))
+                .zkServers(BKNamespaceDriver.getZKServersFromDLUri(uri))
                 .build();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogWriteHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogWriteHandler.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogWriteHandler.java
index 754f945..a0485bd 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogWriteHandler.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogWriteHandler.java
@@ -19,6 +19,7 @@ package com.twitter.distributedlog;
 
 import com.twitter.distributedlog.bk.LedgerAllocator;
 import com.twitter.distributedlog.bk.LedgerAllocatorPool;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
 import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
 import com.twitter.distributedlog.util.FailpointUtils;
 import com.twitter.distributedlog.util.FutureUtils;
@@ -75,7 +76,8 @@ public class TestBKLogWriteHandler extends TestDistributedLogBase {
                     FailpointUtils.FailPointName.FP_StartLogSegmentOnAssignLogSegmentSequenceNumber);
         }
 
-        LedgerAllocator allocator = namespace.getLedgerAllocator();
+        LedgerAllocator allocator = ((BKNamespaceDriver) namespace.getNamespaceDriver())
+                .getLedgerAllocator();
         assertTrue(allocator instanceof LedgerAllocatorPool);
         LedgerAllocatorPool allocatorPool = (LedgerAllocatorPool) allocator;
         assertEquals(0, allocatorPool.obtainMapSize());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
index a388b68..d850db4 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
@@ -19,11 +19,24 @@ package com.twitter.distributedlog;
 
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
 import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.injector.AsyncRandomFailureInjector;
+import com.twitter.distributedlog.io.AsyncCloseable;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
+import com.twitter.distributedlog.util.ConfUtils;
+import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.PermitLimiter;
+import com.twitter.distributedlog.util.SchedulerUtils;
+import com.twitter.util.Future;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
@@ -43,9 +56,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 public class TestDistributedLogBase {
     static final Logger LOG = LoggerFactory.getLogger(TestDistributedLogBase.class);
@@ -87,6 +102,12 @@ public class TestDistributedLogBase {
                 .build();
         bkutil.start();
         zkServers = "127.0.0.1:" + zkPort;
+        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                LOG.warn("Uncaught exception at Thread {} : ", t.getName(), e);
+            }
+        });
     }
 
     @AfterClass
@@ -141,22 +162,7 @@ public class TestDistributedLogBase {
 
     public BKDistributedLogManager createNewDLM(DistributedLogConfiguration conf,
                                                 String name) throws Exception {
-        URI uri = createDLMURI("/" + name);
-        ensureURICreated(uri);
-        return new BKDistributedLogManager(
-                name,
-                conf,
-                uri,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                new SettableFeatureProvider("", 0),
-                PermitLimiter.NULL_PERMIT_LIMITER,
-                NullStatsLogger.INSTANCE
-        );
+        return createNewDLM(conf, name, PermitLimiter.NULL_PERMIT_LIMITER);
     }
 
     public BKDistributedLogManager createNewDLM(DistributedLogConfiguration conf,
@@ -165,48 +171,69 @@ public class TestDistributedLogBase {
             throws Exception {
         URI uri = createDLMURI("/" + name);
         ensureURICreated(uri);
+        final DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+                .uri(uri)
+                .conf(conf)
+                .build();
+        final OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+                .corePoolSize(1)
+                .name("test-scheduler")
+                .build();
+        AsyncCloseable resourcesCloseable = new AsyncCloseable() {
+            @Override
+            public Future<Void> asyncClose() {
+                LOG.info("Shutting down the scheduler");
+                SchedulerUtils.shutdownScheduler(scheduler, 1, TimeUnit.SECONDS);
+                LOG.info("Shut down the scheduler");
+                LOG.info("Closing the namespace");
+                namespace.close();
+                LOG.info("Closed the namespace");
+                return Future.Void();
+            }
+        };
+        AsyncFailureInjector failureInjector = AsyncRandomFailureInjector.newBuilder()
+                .injectDelays(conf.getEIInjectReadAheadDelay(),
+                        conf.getEIInjectReadAheadDelayPercent(),
+                        conf.getEIInjectMaxReadAheadDelayMs())
+                .injectErrors(false, 10)
+                .injectStops(conf.getEIInjectReadAheadStall(), 10)
+                .injectCorruption(conf.getEIInjectReadAheadBrokenEntries())
+                .build();
         return new BKDistributedLogManager(
                 name,
                 conf,
+                ConfUtils.getConstDynConf(conf),
                 uri,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                new SettableFeatureProvider("", 0),
+                namespace.getNamespaceDriver(),
+                new LogSegmentMetadataCache(conf, Ticker.systemTicker()),
+                scheduler,
+                DistributedLogConstants.UNKNOWN_CLIENT_ID,
+                DistributedLogConstants.LOCAL_REGION_ID,
                 writeLimiter,
-                NullStatsLogger.INSTANCE
-        );
-    }
-
-    public DLMTestUtil.BKLogPartitionWriteHandlerAndClients createNewBKDLM(
-            DistributedLogConfiguration conf,
-            String path) throws Exception {
-        return DLMTestUtil.createNewBKDLM(conf, path, zkPort);
+                new SettableFeatureProvider("", 0),
+                failureInjector,
+                NullStatsLogger.INSTANCE,
+                NullStatsLogger.INSTANCE,
+                Optional.of(resourcesCloseable));
     }
 
-    @SuppressWarnings("deprecation")
-    protected LogSegmentMetadataStore getLogSegmentMetadataStore(DistributedLogManagerFactory factory) {
-        DistributedLogNamespace namespace = factory.getNamespace();
-        assertTrue(namespace instanceof BKDistributedLogNamespace);
-        return ((BKDistributedLogNamespace) namespace).getWriterStreamMetadataStore()
+    protected LogSegmentMetadataStore getLogSegmentMetadataStore(DistributedLogNamespace namespace)
+            throws IOException {
+        return namespace.getNamespaceDriver().getLogStreamMetadataStore(NamespaceDriver.Role.READER)
                 .getLogSegmentMetadataStore();
     }
 
-    @SuppressWarnings("deprecation")
-    protected ZooKeeperClient getZooKeeperClient(DistributedLogManagerFactory factory) throws Exception {
-        DistributedLogNamespace namespace = factory.getNamespace();
-        assertTrue(namespace instanceof BKDistributedLogNamespace);
-        return ((BKDistributedLogNamespace) namespace).getSharedWriterZKCForDL();
+    protected ZooKeeperClient getZooKeeperClient(DistributedLogNamespace namespace) throws Exception {
+        NamespaceDriver driver = namespace.getNamespaceDriver();
+        assertTrue(driver instanceof BKNamespaceDriver);
+        return ((BKNamespaceDriver) driver).getWriterZKC();
     }
 
     @SuppressWarnings("deprecation")
-    protected BookKeeperClient getBookKeeperClient(DistributedLogManagerFactory factory) throws Exception {
-        DistributedLogNamespace namespace = factory.getNamespace();
-        assertTrue(namespace instanceof BKDistributedLogNamespace);
-        return ((BKDistributedLogNamespace) namespace).getReaderBKC();
+    protected BookKeeperClient getBookKeeperClient(DistributedLogNamespace namespace) throws Exception {
+        NamespaceDriver driver = namespace.getNamespaceDriver();
+        assertTrue(driver instanceof BKNamespaceDriver);
+        return ((BKNamespaceDriver) driver).getReaderBKC();
     }
 
     protected LedgerHandle getLedgerHandle(BKLogSegmentWriter segmentWriter) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestFailureAndRecovery.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestFailureAndRecovery.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestFailureAndRecovery.java
deleted file mode 100644
index e86e45a..0000000
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestFailureAndRecovery.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.exceptions.BKTransmitException;
-import com.twitter.distributedlog.io.Abortables;
-import com.twitter.distributedlog.util.FutureUtils;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class TestFailureAndRecovery extends TestDistributedLogBase {
-    static final Log LOG = LogFactory.getLog(TestFailureAndRecovery.class);
-
-    @Test(timeout = 60000)
-    public void testSimpleRecovery() throws Exception {
-        DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients = createNewBKDLM(conf, "distrlog-simplerecovery");
-        BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(1);
-        long txid = 1;
-        for (long i = 1; i <= 100; i++) {
-            LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
-            out.write(op);
-            if ((i % 10) == 0) {
-                FutureUtils.result(out.flushAndCommit());
-            }
-
-        }
-        FutureUtils.result(out.flushAndCommit());
-
-        Abortables.abort(out, false);
-        FutureUtils.result(out.asyncClose());
-
-        assertNull(zkc.exists(bkdlmAndClients.getWriteHandler().completedLedgerZNode(1, 100, out.getLogSegmentSequenceNumber()), false));
-        assertNotNull(zkc.exists(bkdlmAndClients.getWriteHandler().inprogressZNode(out.getLogSegmentId(), 1, out.getLogSegmentSequenceNumber()), false));
-
-        FutureUtils.result(bkdlmAndClients.getWriteHandler().recoverIncompleteLogSegments());
-
-        assertNotNull(zkc.exists(bkdlmAndClients.getWriteHandler().completedLedgerZNode(1, 100, out.getLogSegmentSequenceNumber()), false));
-        assertNull(zkc.exists(bkdlmAndClients.getWriteHandler().inprogressZNode(out.getLogSegmentId(), 1, out.getLogSegmentSequenceNumber()), false));
-    }
-
-    /**
-     * Test that if enough bookies fail to prevent an ensemble,
-     * writes the bookkeeper will fail. Test that when once again
-     * an ensemble is available, it can continue to write.
-     */
-    @Test(timeout = 60000)
-    public void testAllBookieFailure() throws Exception {
-        BookieServer bookieToFail = bkutil.newBookie();
-        BookieServer replacementBookie = null;
-
-        try {
-            int ensembleSize = numBookies + 1;
-            assertEquals("Begin: New bookie didn't start",
-                ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
-
-            // ensure that the journal manager has to use all bookies,
-            // so that a failure will fail the journal manager
-            DistributedLogConfiguration conf = new DistributedLogConfiguration();
-            conf.setEnsembleSize(ensembleSize);
-            conf.setWriteQuorumSize(ensembleSize);
-            conf.setAckQuorumSize(ensembleSize);
-            long txid = 1;
-            DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients = createNewBKDLM(conf, "distrlog-allbookiefailure");
-            BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(txid);
-
-            for (long i = 1; i <= 3; i++) {
-                LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
-                out.write(op);
-            }
-            FutureUtils.result(out.flushAndCommit());
-            bookieToFail.shutdown();
-            assertEquals("New bookie didn't die",
-                numBookies, bkutil.checkBookiesUp(numBookies, 10));
-
-            try {
-                for (long i = 1; i <= 3; i++) {
-                    LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
-                    out.write(op);
-                    txid++;
-                }
-                FutureUtils.result(out.flushAndCommit());
-                fail("should not get to this stage");
-            } catch (BKTransmitException bkte) {
-                LOG.debug("Error writing to bookkeeper", bkte);
-                assertEquals("Invalid exception message",
-                        BKException.Code.NotEnoughBookiesException, bkte.getBKResultCode());
-            }
-            replacementBookie = bkutil.newBookie();
-
-            assertEquals("Replacement: New bookie didn't start",
-                numBookies + 1, bkutil.checkBookiesUp(numBookies + 1, 10));
-            out = bkdlmAndClients.getWriteHandler().startLogSegment(txid);
-            for (long i = 1; i <= 3; i++) {
-                LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
-                out.write(op);
-            }
-
-            FutureUtils.result(out.flushAndCommit());
-        } catch (Exception e) {
-            LOG.error("Exception in test", e);
-            throw e;
-        } finally {
-            if (replacementBookie != null) {
-                replacementBookie.shutdown();
-            }
-            bookieToFail.shutdown();
-
-            if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
-                LOG.warn("Not all bookies from this test shut down, expect errors");
-            }
-        }
-    }
-
-    /**
-     * Test that a BookKeeper JM can continue to work across the
-     * failure of a bookie. This should be handled transparently
-     * by bookkeeper.
-     */
-    @Test(timeout = 60000)
-    public void testOneBookieFailure() throws Exception {
-        BookieServer bookieToFail = bkutil.newBookie();
-        BookieServer replacementBookie = null;
-
-        try {
-            int ensembleSize = numBookies + 1;
-            assertEquals("New bookie didn't start",
-                ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
-
-            // ensure that the journal manager has to use all bookies,
-            // so that a failure will fail the journal manager
-            DistributedLogConfiguration conf = new DistributedLogConfiguration();
-            conf.setEnsembleSize(ensembleSize);
-            conf.setWriteQuorumSize(ensembleSize);
-            conf.setAckQuorumSize(ensembleSize);
-            long txid = 1;
-            DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients = createNewBKDLM(conf, "distrlog-onebookiefailure");
-            BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(txid);
-            for (long i = 1; i <= 3; i++) {
-                LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
-                out.write(op);
-            }
-            FutureUtils.result(out.flushAndCommit());
-
-            replacementBookie = bkutil.newBookie();
-            assertEquals("replacement bookie didn't start",
-                ensembleSize + 1, bkutil.checkBookiesUp(ensembleSize + 1, 10));
-            bookieToFail.shutdown();
-            assertEquals("New bookie didn't die",
-                ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
-
-            for (long i = 1; i <= 3; i++) {
-                LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
-                out.write(op);
-            }
-            FutureUtils.result(out.flushAndCommit());
-        } catch (Exception e) {
-            LOG.error("Exception in test", e);
-            throw e;
-        } finally {
-            if (replacementBookie != null) {
-                replacementBookie.shutdown();
-            }
-            bookieToFail.shutdown();
-
-            if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
-                LOG.warn("Not all bookies from this test shut down, expect errors");
-            }
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testRecoveryEmptyLedger() throws Exception {
-        DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients = createNewBKDLM(conf, "distrlog-recovery-empty-ledger");
-        BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(1);
-        long txid = 1;
-        for (long i = 1; i <= 100; i++) {
-            LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
-            out.write(op);
-            if ((i % 10) == 0) {
-                FutureUtils.result(out.flushAndCommit());
-            }
-
-        }
-        FutureUtils.result(out.flushAndCommit());
-        FutureUtils.result(out.asyncClose());
-        bkdlmAndClients.getWriteHandler().completeAndCloseLogSegment(out.getLogSegmentSequenceNumber(), out.getLogSegmentId(), 1, 100, 100);
-        assertNotNull(zkc.exists(bkdlmAndClients.getWriteHandler().completedLedgerZNode(1, 100, out.getLogSegmentSequenceNumber()), false));
-        BKLogSegmentWriter outEmpty = bkdlmAndClients.getWriteHandler().startLogSegment(101);
-        Abortables.abort(outEmpty, false);
-
-        assertNull(zkc.exists(bkdlmAndClients.getWriteHandler().completedLedgerZNode(101, 101, outEmpty.getLogSegmentSequenceNumber()), false));
-        assertNotNull(zkc.exists(bkdlmAndClients.getWriteHandler().inprogressZNode(outEmpty.getLogSegmentId(), 101, outEmpty.getLogSegmentSequenceNumber()), false));
-
-        FutureUtils.result(bkdlmAndClients.getWriteHandler().recoverIncompleteLogSegments());
-
-        assertNull(zkc.exists(bkdlmAndClients.getWriteHandler().inprogressZNode(outEmpty.getLogSegmentId(), outEmpty.getLogSegmentSequenceNumber(), 101), false));
-        assertNotNull(zkc.exists(bkdlmAndClients.getWriteHandler().completedLedgerZNode(101, 101, outEmpty.getLogSegmentSequenceNumber()), false));
-    }
-
-    @Test(timeout = 60000)
-    public void testRecoveryAPI() throws Exception {
-        DistributedLogManager dlm = createNewDLM(conf, "distrlog-recovery-api");
-        BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned();
-        long txid = 1;
-        for (long i = 1; i <= 100; i++) {
-            LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
-            out.write(op);
-            if ((i % 10) == 0) {
-                out.setReadyToFlush();
-                out.flushAndSync();
-            }
-
-        }
-        BKLogSegmentWriter perStreamLogWriter = out.getCachedLogWriter();
-        out.setReadyToFlush();
-        out.flushAndSync();
-
-        out.abort();
-
-        BKLogWriteHandler blplm1 = ((BKDistributedLogManager) (dlm)).createWriteHandler(true);
-
-        assertNull(zkc.exists(blplm1.completedLedgerZNode(1, 100,
-                                                          perStreamLogWriter.getLogSegmentSequenceNumber()), false));
-        assertNotNull(zkc.exists(blplm1.inprogressZNode(perStreamLogWriter.getLogSegmentId(), 1,
-                                                        perStreamLogWriter.getLogSegmentSequenceNumber()), false));
-
-        dlm.recover();
-
-        assertNotNull(zkc.exists(blplm1.completedLedgerZNode(1, 100,
-                                                             perStreamLogWriter.getLogSegmentSequenceNumber()), false));
-        assertNull(zkc.exists(blplm1.inprogressZNode(perStreamLogWriter.getLogSegmentId(), 1,
-                                                     perStreamLogWriter.getLogSegmentSequenceNumber()), false));
-        FutureUtils.result(blplm1.asyncClose());
-        assertEquals(100, dlm.getLogRecordCount());
-        dlm.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestInterleavedReaders.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestInterleavedReaders.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestInterleavedReaders.java
index 3cdd676..830e059 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestInterleavedReaders.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestInterleavedReaders.java
@@ -22,8 +22,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -337,72 +335,4 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
         dlmreader1.close();
     }
 
-    @Test(timeout = 60000)
-    public void testFactorySharedClients() throws Exception {
-        String name = "distrlog-factorysharedclients";
-        testFactory(name, true);
-    }
-
-    @Test(timeout = 60000)
-    public void testFactorySharedZK() throws Exception {
-        String name = "distrlog-factorysharedZK";
-        testFactory(name, false);
-    }
-
-    @SuppressWarnings("deprecation")
-    private void testFactory(String name, boolean shareBK) throws Exception {
-        int count = 3;
-        URI uri = createDLMURI("/" + name);
-        ensureURICreated(uri);
-        BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder()
-                .conf(conf).uri(uri).build();
-        DistributedLogManager[] dlms = new DistributedLogManager[count];
-        for (int s = 0; s < count; s++) {
-            if (shareBK) {
-                dlms[s] = namespace.createDistributedLogManager(name + String.format("%d", s),
-                        DistributedLogManagerFactory.ClientSharingOption.SharedClients);
-            } else {
-                dlms[s] = namespace.createDistributedLogManager(name + String.format("%d", s),
-                        DistributedLogManagerFactory.ClientSharingOption.SharedZKClientPerStreamBKClient);
-            }
-        }
-
-        int txid = 1;
-        for (long i = 0; i < 3; i++) {
-            BKSyncLogWriter[] writers = new BKSyncLogWriter[count];
-            for (int s = 0; s < count; s++) {
-                writers[s] = (BKSyncLogWriter)(dlms[s].startLogSegmentNonPartitioned());
-            }
-
-            for (long j = 0; j < 1; j++) {
-                final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
-                for (int s = 0; s < count; s++) {
-                    writers[s].write(record);
-                }
-            }
-            for (int s = 0; s < count; s++) {
-                writers[s].closeAndComplete();
-            }
-
-            if (i < 2) {
-                // Restart the zeroth stream and make sure that the other streams can
-                // continue without restart
-                dlms[0].close();
-                if (shareBK) {
-                    dlms[0] = namespace.createDistributedLogManager(name + String.format("%d", 0),
-                            DistributedLogManagerFactory.ClientSharingOption.SharedClients);
-                } else {
-                    dlms[0] = namespace.createDistributedLogManager(name + String.format("%d", 0),
-                            DistributedLogManagerFactory.ClientSharingOption.SharedZKClientPerStreamBKClient);
-                }
-            }
-
-        }
-
-        for (int s = 0; s < count; s++) {
-            dlms[s].close();
-        }
-
-        namespace.close();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java
index bb67214..06c7bba 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java
@@ -80,10 +80,10 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
                 .setImmediateFlushEnabled(true)
                 .setEnableLedgerAllocatorPool(true)
                 .setLedgerAllocatorPoolName("test");
-        BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder().conf(conf).uri(uri).build();
+        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
 
         namespace.createLog(streamName);
-        MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf);
+        MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
         assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO, max1.getSequenceNumber());
         DistributedLogManager dlm = namespace.openLog(streamName);
         final int numSegments = 3;
@@ -92,7 +92,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
             out.write(DLMTestUtil.getLogRecordInstance(i));
             out.closeAndComplete();
         }
-        MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf);
+        MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
         assertEquals(3, max2.getSequenceNumber());
         dlm.close();
         namespace.close();
@@ -111,10 +111,10 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
                 .setImmediateFlushEnabled(true)
                 .setEnableLedgerAllocatorPool(true)
                 .setLedgerAllocatorPoolName("test");
-        BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder().conf(conf).uri(uri).build();
+        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
 
         namespace.createLog(streamName);
-        MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf);
+        MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
         assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO, max1.getSequenceNumber());
         DistributedLogManager dlm = namespace.openLog(streamName);
         final int numSegments = 3;
@@ -123,11 +123,11 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
             out.write(DLMTestUtil.getLogRecordInstance(i));
             out.closeAndComplete();
         }
-        MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf);
+        MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
         assertEquals(3, max2.getSequenceNumber());
 
         // nuke the max ledger sequence number
-        updateMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf, new byte[0]);
+        updateMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf, new byte[0]);
         DistributedLogManager dlm1 = namespace.openLog(streamName);
         try {
             dlm1.startLogSegmentNonPartitioned();
@@ -139,7 +139,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
         }
 
         // invalid max ledger sequence number
-        updateMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf, "invalid-max".getBytes(UTF_8));
+        updateMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf, "invalid-max".getBytes(UTF_8));
         DistributedLogManager dlm2 = namespace.openLog(streamName);
         try {
             dlm2.startLogSegmentNonPartitioned();
@@ -167,10 +167,10 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
                 .setImmediateFlushEnabled(true)
                 .setEnableLedgerAllocatorPool(true)
                 .setLedgerAllocatorPoolName("test");
-        BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder().conf(conf).uri(uri).build();
+        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
 
         namespace.createLog(streamName);
-        MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf);
+        MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
         assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO, max1.getSequenceNumber());
         DistributedLogManager dlm = namespace.openLog(streamName);
         final int numSegments = 3;
@@ -179,11 +179,11 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
             out.write(DLMTestUtil.getLogRecordInstance(i));
             out.closeAndComplete();
         }
-        MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf);
+        MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
         assertEquals(3, max2.getSequenceNumber());
 
         // update the max ledger sequence number
-        updateMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf,
+        updateMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf,
                 DLUtils.serializeLogSegmentSequenceNumber(99));
 
         DistributedLogManager dlm1 = namespace.openLog(streamName);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java
index e322234..9553637 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java
@@ -54,6 +54,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
         confLocal.setReadAheadBatchSize(1);
         confLocal.setReadAheadMaxRecords(1);
         confLocal.setReaderIdleWarnThresholdMillis(100);
+        confLocal.setReadLACLongPollTimeout(49);
         final DistributedLogManager dlm = createNewDLM(confLocal, name);
         ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
         ScheduledFuture writerClosedFuture = null;
@@ -129,6 +130,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
         confLocal.loadConf(conf);
         confLocal.setReadAheadBatchSize(1);
         confLocal.setReadAheadMaxRecords(1);
+        confLocal.setReadLACLongPollTimeout(24);
         confLocal.setReaderIdleWarnThresholdMillis(50);
         confLocal.setReaderIdleErrorThresholdMillis(100);
         final DistributedLogManager dlm = createNewDLM(confLocal, name);
@@ -174,6 +176,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
         confLocal.loadConf(conf);
         confLocal.setReadAheadBatchSize(1);
         confLocal.setReadAheadMaxRecords(3);
+        confLocal.setReadLACLongPollTimeout(249);
         confLocal.setReaderIdleWarnThresholdMillis(500);
         confLocal.setReaderIdleErrorThresholdMillis(30000);
         final DistributedLogManager dlm = createNewDLM(confLocal, name);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
index 74a5231..cf4fc4f 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
@@ -25,6 +25,7 @@ import com.twitter.distributedlog.exceptions.DLIllegalStateException;
 import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
 import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
+import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.Utils;
@@ -38,6 +39,7 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.*;
 
@@ -54,6 +56,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
     private DistributedLogConfiguration baseConf;
     private OrderedScheduler scheduler;
     private BookKeeperClient bkc;
+    private ZooKeeperClient zkc;
 
     @Before
     public void setup() throws Exception {
@@ -66,6 +69,12 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
         baseConf.setReadAheadMaxRecords(MAX_CACHED_ENTRIES);
         baseConf.setNumPrefetchEntriesPerLogSegment(NUM_PREFETCH_ENTRIES);
         baseConf.setMaxPrefetchEntriesPerLogSegment(NUM_PREFETCH_ENTRIES);
+        zkc = ZooKeeperClientBuilder.newBuilder()
+                .name("test-zk")
+                .zkServers(bkutil.getZkServers())
+                .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
+                .zkAclId(conf.getZkAclId())
+                .build();
         bkc = BookKeeperClientBuilder.newBuilder()
                 .name("test-bk")
                 .dlConfig(conf)
@@ -86,6 +95,9 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
         if (null != scheduler) {
             scheduler.shutdown();
         }
+        if (null != zkc) {
+            zkc.close();
+        }
         super.teardown();
     }
 
@@ -99,8 +111,11 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
                 true);
         LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore(
                 conf,
+                ConfUtils.getConstDynConf(conf),
+                zkc,
                 bkc,
                 scheduler,
+                null,
                 NullStatsLogger.INSTANCE,
                 AsyncFailureInjector.NULL);
         return new ReadAheadEntryReader(
@@ -309,7 +324,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
         BKDistributedLogManager dlm = createNewDLM(baseConf, streamName);
 
         // generate list of log segments
-        generateCompletedLogSegments(dlm, 3, 2);
+        generateCompletedLogSegments(dlm, 3, 3);
         AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
         FutureUtils.result(writer.truncate(new DLSN(2L, 1L, 0L)));
 
@@ -321,23 +336,39 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
         readAheadEntryReader.start(segments);
         // ensure initialization to complete
         ensureOrderSchedulerEmpty(streamName);
-        expectAlreadyTruncatedTransactionException(readAheadEntryReader,
-                "should fail on positioning to a truncated log segment");
+        expectNoException(readAheadEntryReader);
+        Entry.Reader entryReader =
+                readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        assertEquals(2L, entryReader.getLSSN());
+        assertEquals(1L, entryReader.getEntryId());
+        Utils.close(readAheadEntryReader);
 
         // positioning on a partially truncated log segment (segment 2) before min active dlsn
         readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 0L, 0L), dlm, baseConf);
         readAheadEntryReader.start(segments);
         // ensure initialization to complete
         ensureOrderSchedulerEmpty(streamName);
-        expectAlreadyTruncatedTransactionException(readAheadEntryReader,
-                "should fail on positioning to a partially truncated log segment");
+        expectNoException(readAheadEntryReader);
+        entryReader =
+                readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        assertEquals(2L, entryReader.getLSSN());
+        assertEquals(1L, entryReader.getEntryId());
+        Utils.close(readAheadEntryReader);
 
         // positioning on a partially truncated log segment (segment 2) after min active dlsn
-        readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 1L, 0L), dlm, baseConf);
+        readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 2L, 0L), dlm, baseConf);
         readAheadEntryReader.start(segments);
         // ensure initialization to complete
         ensureOrderSchedulerEmpty(streamName);
         expectNoException(readAheadEntryReader);
+        entryReader =
+                readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        assertEquals(2L, entryReader.getLSSN());
+        assertEquals(2L, entryReader.getEntryId());
+        Utils.close(readAheadEntryReader);
+
+        Utils.close(writer);
+        dlm.close();
     }
 
     @Test(timeout = 60000)
@@ -363,6 +394,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
         // ensure initialization to complete
         ensureOrderSchedulerEmpty(streamName);
         expectNoException(readAheadEntryReader);
+        Utils.close(readAheadEntryReader);
 
         // positioning on a partially truncated log segment (segment 2) before min active dlsn
         readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 0L, 0L), dlm, confLocal);
@@ -370,6 +402,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
         // ensure initialization to complete
         ensureOrderSchedulerEmpty(streamName);
         expectNoException(readAheadEntryReader);
+        Utils.close(readAheadEntryReader);
 
         // positioning on a partially truncated log segment (segment 2) after min active dlsn
         readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 1L, 0L), dlm, confLocal);
@@ -377,6 +410,10 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
         // ensure initialization to complete
         ensureOrderSchedulerEmpty(streamName);
         expectNoException(readAheadEntryReader);
+        Utils.close(readAheadEntryReader);
+
+        Utils.close(writer);
+        dlm.close();
     }
 
     //
@@ -418,6 +455,9 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
         ensureOrderSchedulerEmpty(streamName);
         expectIllegalStateException(readAheadEntryReader,
                 "inconsistent log segment found");
+
+        Utils.close(readAheadEntryReader);
+        dlm.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
index 99ef041..b183b84 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
@@ -331,6 +331,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
 
     @FlakyTest
     @Test(timeout = 60000)
+    @SuppressWarnings("deprecation")
     public void testCaughtUpReaderOnLogSegmentRolling() throws Exception {
         String name = "distrlog-caughtup-reader-on-logsegment-rolling";
 
@@ -344,6 +345,8 @@ public class TestRollLogSegments extends TestDistributedLogBase {
         confLocal.setWriteQuorumSize(1);
         confLocal.setAckQuorumSize(1);
         confLocal.setReadLACLongPollTimeout(99999999);
+        confLocal.setReaderIdleWarnThresholdMillis(2 * 99999999 + 1);
+        confLocal.setBKClientReadTimeout(99999999 + 1);
 
         DistributedLogManager dlm = createNewDLM(confLocal, name);
         BKSyncLogWriter writer = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned();
@@ -368,7 +371,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
         }
 
         BKLogSegmentWriter perStreamWriter = writer.segmentWriter;
-        BookKeeperClient bkc = readDLM.getReaderBKC();
+        BookKeeperClient bkc = DLMTestUtil.getBookKeeperClient(readDLM);
         LedgerHandle readLh = bkc.get().openLedgerNoRecovery(getLedgerHandle(perStreamWriter).getId(),
                 BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControl.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControl.java
index ee2968d..ff924f8 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControl.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControl.java
@@ -20,6 +20,7 @@ package com.twitter.distributedlog.acl;
 import com.twitter.distributedlog.TestZooKeeperClientBuilder;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.ZooKeeperClusterTestCase;
+import com.twitter.distributedlog.impl.acl.ZKAccessControl;
 import com.twitter.distributedlog.thrift.AccessControlEntry;
 import com.twitter.util.Await;
 import org.apache.zookeeper.CreateMode;
@@ -103,7 +104,7 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase {
 
         ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null));
 
-        assertEquals(zkPath, readZKAC.zkPath);
+        assertEquals(zkPath, readZKAC.getZKPath());
         assertEquals(ZKAccessControl.DEFAULT_ACCESS_CONTROL_ENTRY, readZKAC.getAccessControlEntry());
         assertTrue(ZKAccessControl.DEFAULT_ACCESS_CONTROL_ENTRY == readZKAC.getAccessControlEntry());
     }
@@ -145,7 +146,7 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase {
         } catch (KeeperException.BadVersionException bve) {
             // expected
         }
-        readZKAC2.accessControlEntry.setDenyTruncate(true);
+        readZKAC2.getAccessControlEntry().setDenyTruncate(true);
         Await.result(readZKAC2.update(zkc));
         ZKAccessControl readZKAC3 = Await.result(ZKAccessControl.read(zkc, zkPath, null));
         assertEquals(readZKAC2, readZKAC3);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControlManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControlManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControlManager.java
index 8ba82f5..5625306 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControlManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControlManager.java
@@ -22,6 +22,8 @@ import com.twitter.distributedlog.TestZooKeeperClientBuilder;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.ZooKeeperClientUtils;
 import com.twitter.distributedlog.ZooKeeperClusterTestCase;
+import com.twitter.distributedlog.impl.acl.ZKAccessControl;
+import com.twitter.distributedlog.impl.acl.ZKAccessControlManager;
 import com.twitter.distributedlog.thrift.AccessControlEntry;
 import com.twitter.util.Await;
 import org.junit.After;



Mime
View raw message