bookkeeper-distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [23/31] incubator-distributedlog git commit: DL-163: clean up direct zookeeper and bookkeeper usage and use metadata/data store abstraction
Date Fri, 30 Dec 2016 00:07:37 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
index d1069c3..60bc420 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
@@ -28,6 +28,8 @@ import com.twitter.distributedlog.TestZooKeeperClientBuilder;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
 import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.SchedulerUtils;
 import org.apache.zookeeper.CreateMode;
@@ -103,8 +105,10 @@ public class TestDLCK extends TestDistributedLogBase {
         confLocal.setLogSegmentCacheEnabled(false);
         URI uri = createDLMURI("/check-and-repair-dl-namespace");
         zkc.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        com.twitter.distributedlog.DistributedLogManagerFactory factory =
-                new com.twitter.distributedlog.DistributedLogManagerFactory(confLocal, uri);
+        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+                .conf(confLocal)
+                .uri(uri)
+                .build();
         OrderedScheduler scheduler = OrderedScheduler.newBuilder()
                 .name("dlck-tool")
                 .corePoolSize(1)
@@ -114,17 +118,20 @@ public class TestDLCK extends TestDistributedLogBase {
         String streamName = "check-and-repair-dl-namespace";
 
         // Create completed log segments
-        DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName);
+        DistributedLogManager dlm = namespace.openLog(streamName);
         DLMTestUtil.injectLogSegmentWithLastDLSN(dlm, confLocal, 1L, 1L, 10, false);
         DLMTestUtil.injectLogSegmentWithLastDLSN(dlm, confLocal, 2L, 11L, 10, true);
         DLMTestUtil.injectLogSegmentWithLastDLSN(dlm, confLocal, 3L, 21L, 10, false);
         DLMTestUtil.injectLogSegmentWithLastDLSN(dlm, confLocal, 4L, 31L, 10, true);
 
         // dryrun
-        BookKeeperClient bkc = getBookKeeperClient(factory);
-        DistributedLogAdmin.checkAndRepairDLNamespace(uri, factory,
-                new DryrunLogSegmentMetadataStoreUpdater(confLocal, getLogSegmentMetadataStore(factory)),
-                scheduler, bkc, confLocal.getBKDigestPW(), false, false);
+        DistributedLogAdmin.checkAndRepairDLNamespace(
+                uri,
+                namespace,
+                new DryrunLogSegmentMetadataStoreUpdater(confLocal, getLogSegmentMetadataStore(namespace)),
+                scheduler,
+                false,
+                false);
 
         Map<Long, LogSegmentMetadata> segments = getLogSegments(dlm);
         LOG.info("segments after drynrun {}", segments);
@@ -134,10 +141,13 @@ public class TestDLCK extends TestDistributedLogBase {
         verifyLogSegment(segments, new DLSN(4L, 16L, 0L), 4L, 9, 39L);
 
         // check and repair
-        bkc = getBookKeeperClient(factory);
-        DistributedLogAdmin.checkAndRepairDLNamespace(uri, factory,
-                LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, getLogSegmentMetadataStore(factory)),
-                scheduler, bkc, confLocal.getBKDigestPW(), false, false);
+        DistributedLogAdmin.checkAndRepairDLNamespace(
+                uri,
+                namespace,
+                LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, getLogSegmentMetadataStore(namespace)),
+                scheduler,
+                false,
+                false);
 
         segments = getLogSegments(dlm);
         LOG.info("segments after repair {}", segments);
@@ -148,7 +158,7 @@ public class TestDLCK extends TestDistributedLogBase {
 
         dlm.close();
         SchedulerUtils.shutdownScheduler(executorService, 5, TimeUnit.MINUTES);
-        factory.close();
+        namespace.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java
index 66d7228..1e39e49 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java
@@ -24,6 +24,8 @@ import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.TestZooKeeperClientBuilder;
 import com.twitter.distributedlog.annotations.DistributedLogAnnotations;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
 import com.twitter.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
@@ -90,21 +92,25 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
 
         URI uri = createDLMURI("/change-sequence-number");
         zooKeeperClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        com.twitter.distributedlog.DistributedLogManagerFactory factory =
-                new com.twitter.distributedlog.DistributedLogManagerFactory(confLocal, uri);
-        com.twitter.distributedlog.DistributedLogManagerFactory readFactory =
-                new com.twitter.distributedlog.DistributedLogManagerFactory(readConf, uri);
+        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+                .conf(confLocal)
+                .uri(uri)
+                .build();
+        DistributedLogNamespace readNamespace = DistributedLogNamespaceBuilder.newBuilder()
+                .conf(readConf)
+                .uri(uri)
+                .build();
 
         String streamName = "change-sequence-number";
 
         // create completed log segments
-        DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName);
+        DistributedLogManager dlm = namespace.openLog(streamName);
         DLMTestUtil.generateCompletedLogSegments(dlm, confLocal, 4, 10);
         DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(dlm, confLocal, 5, 41, false, 10, true);
         dlm.close();
 
         // create a reader
-        DistributedLogManager readDLM = readFactory.createDistributedLogManagerWithSharedClients(streamName);
+        DistributedLogManager readDLM = readNamespace.openLog(streamName);
         AsyncLogReader reader = readDLM.getAsyncLogReader(DLSN.InitialDLSN);
 
         // read the records
@@ -121,7 +127,7 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
 
         LOG.info("Injecting bad log segment '3'");
 
-        dlm = factory.createDistributedLogManagerWithSharedClients(streamName);
+        dlm = namespace.openLog(streamName);
         DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(dlm, confLocal, 3L, 5 * 10 + 1, true, 10, false);
 
         LOG.info("Injected bad log segment '3'");
@@ -140,8 +146,8 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
         LOG.info("Dryrun fix inprogress segment that has lower sequence number");
 
         // Dryrun
-        DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(factory,
-                new DryrunLogSegmentMetadataStoreUpdater(confLocal, getLogSegmentMetadataStore(factory)), streamName, false, false);
+        DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(namespace,
+                new DryrunLogSegmentMetadataStoreUpdater(confLocal, getLogSegmentMetadataStore(namespace)), streamName, false, false);
 
         try {
             reader = readDLM.getAsyncLogReader(lastDLSN);
@@ -154,8 +160,8 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
         LOG.info("Actual run fix inprogress segment that has lower sequence number");
 
         // Actual run
-        DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(factory,
-                LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, getLogSegmentMetadataStore(factory)), streamName, false, false);
+        DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(namespace,
+                LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, getLogSegmentMetadataStore(namespace)), streamName, false, false);
 
         // be able to read more after fix
         reader = readDLM.getAsyncLogReader(lastDLSN);
@@ -182,7 +188,7 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
         readDLM.close();
 
         dlm.close();
-        factory.close();
-        readFactory.close();
+        namespace.close();
+        readNamespace.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index 46e8af0..de7016a 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -531,7 +531,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
                 children, firstSegmentList);
 
         ZooKeeperClientUtils.expireSession(zkc,
-                DLUtils.getZKServersFromDLUri(uri), conf.getZKSessionTimeoutMilliseconds());
+                BKNamespaceDriver.getZKServersFromDLUri(uri), conf.getZKSessionTimeoutMilliseconds());
 
         logger.info("Create another {} segments.", numSegments);
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKNamespaceWatcher.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKNamespaceWatcher.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKNamespaceWatcher.java
index da9f577..c9a2e5b 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKNamespaceWatcher.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKNamespaceWatcher.java
@@ -174,7 +174,7 @@ public class TestZKNamespaceWatcher extends TestDistributedLogBase {
         createLogInNamespace(uri, "test2");
         latches[2].await();
         assertEquals(2, receivedLogs.get().size());
-        ZooKeeperClientUtils.expireSession(zkc, DLUtils.getZKServersFromDLUri(uri), zkSessionTimeoutMs);
+        ZooKeeperClientUtils.expireSession(zkc, BKNamespaceDriver.getZKServersFromDLUri(uri), zkSessionTimeoutMs);
         latches[3].await();
         assertEquals(2, receivedLogs.get().size());
         createLogInNamespace(uri, "test3");

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
index 673d856..0ce9f46 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
@@ -30,6 +30,7 @@ import com.twitter.distributedlog.ZooKeeperClientUtils;
 import com.twitter.distributedlog.callback.NamespaceListener;
 import com.twitter.distributedlog.exceptions.LogExistsException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
 import com.twitter.distributedlog.metadata.LogMetadataStore;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
@@ -422,7 +423,7 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase {
         TestNamespaceListenerWithExpectedSize listener =
                 new TestNamespaceListenerWithExpectedSize(2 * maxLogsPerSubnamespace + 1);
         metadataStore.registerNamespaceListener(listener);
-        ZooKeeperClientUtils.expireSession(zkc, DLUtils.getZKServersFromDLUri(uri), zkSessionTimeoutMs);
+        ZooKeeperClientUtils.expireSession(zkc, BKNamespaceDriver.getZKServersFromDLUri(uri), zkSessionTimeoutMs);
         String testLogName = "test-log-name";
         allLogs.add(testLogName);
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
index 4cf86fa..183a405 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
@@ -30,10 +30,13 @@ import com.twitter.distributedlog.LogRecord;
 import com.twitter.distributedlog.LogRecordWithDLSN;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.TestDistributedLogBase;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.ZooKeeperClientBuilder;
 import com.twitter.distributedlog.exceptions.EndOfLogSegmentException;
 import com.twitter.distributedlog.exceptions.ReadCancelledException;
 import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
+import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.Utils;
@@ -59,10 +62,17 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase {
     public TestName runtime = new TestName();
     private OrderedScheduler scheduler;
     private BookKeeperClient bkc;
+    private ZooKeeperClient zkc;
 
     @Before
     public void setup() throws Exception {
         super.setup();
+        zkc = ZooKeeperClientBuilder.newBuilder()
+                .name("test-zk")
+                .zkServers(bkutil.getZkServers())
+                .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
+                .zkAclId(conf.getZkAclId())
+                .build();
         bkc = BookKeeperClientBuilder.newBuilder()
                 .name("test-bk")
                 .dlConfig(conf)
@@ -83,6 +93,9 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase {
         if (null != scheduler) {
             scheduler.shutdown();
         }
+        if (null != zkc) {
+            zkc.close();
+        }
         super.teardown();
     }
 
@@ -91,7 +104,14 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase {
                                               DistributedLogConfiguration conf)
             throws Exception {
         LogSegmentEntryStore store = new BKLogSegmentEntryStore(
-                conf, bkc, scheduler, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL);
+                conf,
+                ConfUtils.getConstDynConf(conf),
+                zkc,
+                bkc,
+                scheduler,
+                null,
+                NullStatsLogger.INSTANCE,
+                AsyncFailureInjector.NULL);
         return (BKLogSegmentEntryReader) FutureUtils.result(store.openReader(segment, startEntryId));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
index 41544d6..1b19b2e 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
@@ -17,16 +17,16 @@
  */
 package com.twitter.distributedlog.impl.metadata;
 
-import com.twitter.distributedlog.TestZooKeeperClientBuilder;
-import com.twitter.distributedlog.metadata.BKDLConfig;
-import com.twitter.distributedlog.metadata.DLMetadata;
 import com.google.common.collect.Lists;
 import com.twitter.distributedlog.DLMTestUtil;
 import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.MetadataAccessor;
+import com.twitter.distributedlog.TestZooKeeperClientBuilder;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
+import com.twitter.distributedlog.metadata.DLMetadata;
 import com.twitter.distributedlog.metadata.LogMetadataForWriter;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.distributedlog.DistributedLogManager;
 import com.twitter.distributedlog.DistributedLogConstants;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.ZooKeeperClient;
@@ -317,9 +317,9 @@ public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase {
             .uri(uri)
             .build();
 
-        DistributedLogManager dlm = namespace.openLog(logName);
-        dlm.createOrUpdateMetadata(logName.getBytes("UTF-8"));
-        dlm.close();
+        MetadataAccessor accessor = namespace.getNamespaceDriver().getMetadataAccessor(logName);
+        accessor.createOrUpdateMetadata(logName.getBytes("UTF-8"));
+        accessor.close();
 
         testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false);
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZkMetadataResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZkMetadataResolver.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZkMetadataResolver.java
new file mode 100644
index 0000000..bbabbb2
--- /dev/null
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZkMetadataResolver.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.metadata;
+
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.TestZooKeeperClientBuilder;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
+import com.twitter.distributedlog.impl.metadata.ZkMetadataResolver;
+import com.twitter.distributedlog.metadata.DLMetadata;
+import com.twitter.distributedlog.util.Utils;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.ZooKeeperClientBuilder;
+import com.twitter.distributedlog.ZooKeeperClusterTestCase;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestZkMetadataResolver extends ZooKeeperClusterTestCase {
+
+    private static final BKDLConfig bkdlConfig = new BKDLConfig("127.0.0.1:7000", "ledgers");
+    private static final BKDLConfig bkdlConfig2 = new BKDLConfig("127.0.0.1:7000", "ledgers2");
+
+    private ZooKeeperClient zkc;
+    private ZkMetadataResolver resolver;
+
+    @Before
+    public void setup() throws Exception {
+        zkc = TestZooKeeperClientBuilder.newBuilder()
+                .uri(createURI("/"))
+                .sessionTimeoutMs(10000)
+                .build();
+        resolver = new ZkMetadataResolver(zkc);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        zkc.close();
+    }
+
+    private URI createURI(String path) {
+        return URI.create("distributedlog://127.0.0.1:" + zkPort + path);
+    }
+
+    @Test(timeout = 60000)
+    public void testResolveFailures() throws Exception {
+        // resolve unexisted path
+        try {
+            resolver.resolve(createURI("/unexisted/path"));
+            fail("Should fail if no metadata resolved.");
+        } catch (IOException e) {
+            // expected
+        }
+        // resolve existed unbound path
+        Utils.zkCreateFullPathOptimistic(zkc, "/existed/path", new byte[0],
+                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        try {
+            resolver.resolve(createURI("/existed/path"));
+            fail("Should fail if no metadata resolved.");
+        } catch (IOException e) {
+            // expected
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testResolve() throws Exception {
+        DLMetadata dlMetadata = DLMetadata.create(bkdlConfig);
+        dlMetadata.create(createURI("/messaging/distributedlog-testresolve"));
+        DLMetadata dlMetadata2 = DLMetadata.create(bkdlConfig2);
+        dlMetadata2.create(createURI("/messaging/distributedlog-testresolve/child"));
+        assertEquals(dlMetadata,
+                resolver.resolve(createURI("/messaging/distributedlog-testresolve")));
+        assertEquals(dlMetadata2,
+                resolver.resolve(createURI("/messaging/distributedlog-testresolve/child")));
+        assertEquals(dlMetadata2,
+                resolver.resolve(createURI("/messaging/distributedlog-testresolve/child/unknown")));
+        Utils.zkCreateFullPathOptimistic(zkc, "/messaging/distributedlog-testresolve/child/child2", new byte[0],
+                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        assertEquals(dlMetadata2,
+                resolver.resolve(createURI("/messaging/distributedlog-testresolve/child/child2")));
+    }
+
+    @Test(timeout = 60000)
+    public void testEncodeRegionID() throws Exception {
+        DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
+
+        URI uri = createURI("/messaging/distributedlog-testencoderegionid/dl1");
+        DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers"));
+        meta1.create(uri);
+        BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri);
+        BKDLConfig.propagateConfiguration(read1, dlConf);
+        assertFalse(dlConf.getEncodeRegionIDInLogSegmentMetadata());
+
+        BKDLConfig.clearCachedDLConfigs();
+
+        DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers").setEncodeRegionID(true));
+        meta2.update(uri);
+        BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri);
+        BKDLConfig.propagateConfiguration(read2, dlConf);
+        assertTrue(dlConf.getEncodeRegionIDInLogSegmentMetadata());
+
+        BKDLConfig.clearCachedDLConfigs();
+
+        DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers").setEncodeRegionID(false));
+        meta3.update(uri);
+        BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri);
+        BKDLConfig.propagateConfiguration(read3, dlConf);
+        assertFalse(dlConf.getEncodeRegionIDInLogSegmentMetadata());
+
+        BKDLConfig.clearCachedDLConfigs();
+    }
+
+    @Test(timeout = 60000)
+    public void testFirstLogSegmentSequenceNumber() throws Exception {
+        DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
+
+        URI uri = createURI("/messaging/distributedlog-testfirstledgerseqno/dl1");
+        DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers"));
+        meta1.create(uri);
+        BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri);
+        BKDLConfig.propagateConfiguration(read1, dlConf);
+        assertEquals(DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO, dlConf.getFirstLogSegmentSequenceNumber());
+
+        BKDLConfig.clearCachedDLConfigs();
+
+        DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")
+                .setFirstLogSegmentSeqNo(9999L));
+        meta2.update(uri);
+        BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri);
+        BKDLConfig.propagateConfiguration(read2, dlConf);
+        assertEquals(9999L, dlConf.getFirstLogSegmentSequenceNumber());
+
+        BKDLConfig.clearCachedDLConfigs();
+
+        DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")
+                .setFirstLogSegmentSeqNo(99L));
+        meta3.update(uri);
+        BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri);
+        BKDLConfig.propagateConfiguration(read3, dlConf);
+        assertEquals(99L, dlConf.getFirstLogSegmentSequenceNumber());
+
+        BKDLConfig.clearCachedDLConfigs();
+    }
+
+    @Test(timeout = 60000)
+    public void testFederatedNamespace() throws Exception {
+        DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
+
+        URI uri = createURI("/messaging/distributedlog-testfederatednamespace/dl1");
+        DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers"));
+        meta1.create(uri);
+        BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri);
+        BKDLConfig.propagateConfiguration(read1, dlConf);
+        assertTrue(dlConf.getCreateStreamIfNotExists());
+
+        BKDLConfig.clearCachedDLConfigs();
+
+        DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")
+                .setFederatedNamespace(true));
+        meta2.update(uri);
+        BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri);
+        BKDLConfig.propagateConfiguration(read2, dlConf);
+        assertFalse(dlConf.getCreateStreamIfNotExists());
+
+        BKDLConfig.clearCachedDLConfigs();
+
+        DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")
+                .setFederatedNamespace(false));
+        meta3.update(uri);
+        BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri);
+        BKDLConfig.propagateConfiguration(read3, dlConf);
+        // if it is non-federated namespace, it won't change the create stream behavior.
+        assertFalse(dlConf.getCreateStreamIfNotExists());
+
+        BKDLConfig.clearCachedDLConfigs();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestDLMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestDLMetadata.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestDLMetadata.java
index d4c2f31..e3cc239 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestDLMetadata.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestDLMetadata.java
@@ -19,6 +19,7 @@ package com.twitter.distributedlog.metadata;
 
 import com.twitter.distributedlog.LocalDLMEmulator;
 import com.twitter.distributedlog.ZooKeeperClusterTestCase;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.junit.After;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestZkMetadataResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestZkMetadataResolver.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestZkMetadataResolver.java
deleted file mode 100644
index 79fb539..0000000
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/metadata/TestZkMetadataResolver.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.metadata;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.TestZooKeeperClientBuilder;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import com.twitter.distributedlog.ZooKeeperClusterTestCase;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.URI;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class TestZkMetadataResolver extends ZooKeeperClusterTestCase {
-
-    private static final BKDLConfig bkdlConfig = new BKDLConfig("127.0.0.1:7000", "ledgers");
-    private static final BKDLConfig bkdlConfig2 = new BKDLConfig("127.0.0.1:7000", "ledgers2");
-
-    private ZooKeeperClient zkc;
-    private ZkMetadataResolver resolver;
-
-    @Before
-    public void setup() throws Exception {
-        zkc = TestZooKeeperClientBuilder.newBuilder()
-                .uri(createURI("/"))
-                .sessionTimeoutMs(10000)
-                .build();
-        resolver = new ZkMetadataResolver(zkc);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        zkc.close();
-    }
-
-    private URI createURI(String path) {
-        return URI.create("distributedlog://127.0.0.1:" + zkPort + path);
-    }
-
-    @Test(timeout = 60000)
-    public void testResolveFailures() throws Exception {
-        // resolve unexisted path
-        try {
-            resolver.resolve(createURI("/unexisted/path"));
-            fail("Should fail if no metadata resolved.");
-        } catch (IOException e) {
-            // expected
-        }
-        // resolve existed unbound path
-        Utils.zkCreateFullPathOptimistic(zkc, "/existed/path", new byte[0],
-                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        try {
-            resolver.resolve(createURI("/existed/path"));
-            fail("Should fail if no metadata resolved.");
-        } catch (IOException e) {
-            // expected
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testResolve() throws Exception {
-        DLMetadata dlMetadata = DLMetadata.create(bkdlConfig);
-        dlMetadata.create(createURI("/messaging/distributedlog-testresolve"));
-        DLMetadata dlMetadata2 = DLMetadata.create(bkdlConfig2);
-        dlMetadata2.create(createURI("/messaging/distributedlog-testresolve/child"));
-        assertEquals(dlMetadata,
-                resolver.resolve(createURI("/messaging/distributedlog-testresolve")));
-        assertEquals(dlMetadata2,
-                resolver.resolve(createURI("/messaging/distributedlog-testresolve/child")));
-        assertEquals(dlMetadata2,
-                resolver.resolve(createURI("/messaging/distributedlog-testresolve/child/unknown")));
-        Utils.zkCreateFullPathOptimistic(zkc, "/messaging/distributedlog-testresolve/child/child2", new byte[0],
-                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        assertEquals(dlMetadata2,
-                resolver.resolve(createURI("/messaging/distributedlog-testresolve/child/child2")));
-    }
-
-    @Test(timeout = 60000)
-    public void testEncodeRegionID() throws Exception {
-        DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
-
-        URI uri = createURI("/messaging/distributedlog-testencoderegionid/dl1");
-        DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers"));
-        meta1.create(uri);
-        BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri);
-        BKDLConfig.propagateConfiguration(read1, dlConf);
-        assertFalse(dlConf.getEncodeRegionIDInLogSegmentMetadata());
-
-        BKDLConfig.clearCachedDLConfigs();
-
-        DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers").setEncodeRegionID(true));
-        meta2.update(uri);
-        BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri);
-        BKDLConfig.propagateConfiguration(read2, dlConf);
-        assertTrue(dlConf.getEncodeRegionIDInLogSegmentMetadata());
-
-        BKDLConfig.clearCachedDLConfigs();
-
-        DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers").setEncodeRegionID(false));
-        meta3.update(uri);
-        BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri);
-        BKDLConfig.propagateConfiguration(read3, dlConf);
-        assertFalse(dlConf.getEncodeRegionIDInLogSegmentMetadata());
-
-        BKDLConfig.clearCachedDLConfigs();
-    }
-
-    @Test(timeout = 60000)
-    public void testFirstLogSegmentSequenceNumber() throws Exception {
-        DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
-
-        URI uri = createURI("/messaging/distributedlog-testfirstledgerseqno/dl1");
-        DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers"));
-        meta1.create(uri);
-        BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri);
-        BKDLConfig.propagateConfiguration(read1, dlConf);
-        assertEquals(DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO, dlConf.getFirstLogSegmentSequenceNumber());
-
-        BKDLConfig.clearCachedDLConfigs();
-
-        DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")
-                .setFirstLogSegmentSeqNo(9999L));
-        meta2.update(uri);
-        BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri);
-        BKDLConfig.propagateConfiguration(read2, dlConf);
-        assertEquals(9999L, dlConf.getFirstLogSegmentSequenceNumber());
-
-        BKDLConfig.clearCachedDLConfigs();
-
-        DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")
-                .setFirstLogSegmentSeqNo(99L));
-        meta3.update(uri);
-        BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri);
-        BKDLConfig.propagateConfiguration(read3, dlConf);
-        assertEquals(99L, dlConf.getFirstLogSegmentSequenceNumber());
-
-        BKDLConfig.clearCachedDLConfigs();
-    }
-
-    @Test(timeout = 60000)
-    public void testFederatedNamespace() throws Exception {
-        DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
-
-        URI uri = createURI("/messaging/distributedlog-testfederatednamespace/dl1");
-        DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers"));
-        meta1.create(uri);
-        BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri);
-        BKDLConfig.propagateConfiguration(read1, dlConf);
-        assertTrue(dlConf.getCreateStreamIfNotExists());
-
-        BKDLConfig.clearCachedDLConfigs();
-
-        DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")
-                .setFederatedNamespace(true));
-        meta2.update(uri);
-        BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri);
-        BKDLConfig.propagateConfiguration(read2, dlConf);
-        assertFalse(dlConf.getCreateStreamIfNotExists());
-
-        BKDLConfig.clearCachedDLConfigs();
-
-        DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")
-                .setFederatedNamespace(false));
-        meta3.update(uri);
-        BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri);
-        BKDLConfig.propagateConfiguration(read3, dlConf);
-        // if it is non-federated namespace, it won't change the create stream behavior.
-        assertFalse(dlConf.getCreateStreamIfNotExists());
-
-        BKDLConfig.clearCachedDLConfigs();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
index 3225ced..a2a0ca6 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
@@ -20,7 +20,7 @@ package com.twitter.distributedlog.service;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.LocalDLMEmulator;
 import com.twitter.distributedlog.client.routing.SingleHostRoutingService;
-import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.DLMetadata;
 import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
 import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
index cfb5b8d..2e49d92 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
@@ -23,12 +23,12 @@ import com.google.common.util.concurrent.RateLimiter;
 import com.twitter.common.zookeeper.ServerSet;
 import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
 import com.twitter.distributedlog.client.serverset.DLZkServerSet;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
 import com.twitter.distributedlog.service.ClientUtils;
 import com.twitter.distributedlog.service.DLSocketAddress;
 import com.twitter.distributedlog.service.DistributedLogClient;
 import com.twitter.distributedlog.service.DistributedLogClientBuilder;
 import com.twitter.distributedlog.tools.Tool;
-import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.finagle.builder.ClientBuilder;
 import com.twitter.finagle.thrift.ClientId$;
 import com.twitter.util.Await;
@@ -269,8 +269,8 @@ public class BalancerTool extends Tool {
                         ClientUtils.buildClient(builder2);
                 try {
                     SimpleBalancer balancer = new SimpleBalancer(
-                            DLUtils.getZKServersFromDLUri(region1), pair1.getLeft(), pair1.getRight(),
-                            DLUtils.getZKServersFromDLUri(region2), pair2.getLeft(), pair2.getRight());
+                            BKNamespaceDriver.getZKServersFromDLUri(region1), pair1.getLeft(), pair1.getRight(),
+                            BKNamespaceDriver.getZKServersFromDLUri(region2), pair2.getLeft(), pair2.getRight());
                     try {
                         return runBalancer(balancer);
                     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
index 18b9d1f..4f01bdc 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
 import com.twitter.distributedlog.BKDistributedLogNamespace;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.util.DLUtils;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
 import com.twitter.distributedlog.util.Utils;
 
 /**
@@ -55,11 +55,12 @@ public class ZKPlacementStateManager implements PlacementStateManager {
   private boolean watching = false;
 
   public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) {
-    zkClient = BKDistributedLogNamespace.createDLZKClientBuilder(
-        String.format("dlzk:%s:factory_writer_shared", uri),
+    String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
+    zkClient = BKNamespaceDriver.createZKClientBuilder(
+        String.format("ZKPlacementStateManager-%s", zkServers),
         conf,
-        DLUtils.getZKServersFromDLUri(uri),
-        statsLogger.scope("dlzk_factory_writer_shared")).build();
+        zkServers,
+        statsLogger.scope("placement_state_manager")).build();
     serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
index c45e42c..218ea06 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
@@ -30,10 +30,10 @@ import com.twitter.distributedlog.LogRecord;
 import com.twitter.distributedlog.LogRecordWithDLSN;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.acl.AccessControlManager;
-import com.twitter.distributedlog.acl.ZKAccessControl;
+import com.twitter.distributedlog.impl.acl.ZKAccessControl;
 import com.twitter.distributedlog.client.routing.LocalRoutingService;
 import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.service.stream.StreamManagerImpl;
 import com.twitter.distributedlog.thrift.AccessControlEntry;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java
index 20c81f3..87ddec0 100644
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java
+++ b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java
@@ -18,12 +18,14 @@
 package com.twitter.distributedlog.mapreduce;
 
 import com.google.common.collect.Lists;
-import com.twitter.distributedlog.BKDistributedLogNamespace;
-import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.DistributedLogManager;
+import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.LogRecordWithDLSN;
 import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperAccessor;
@@ -57,7 +59,7 @@ public class DistributedLogInputFormat
     protected Configuration conf;
     protected DistributedLogConfiguration dlConf;
     protected URI dlUri;
-    protected BKDistributedLogNamespace namespace;
+    protected DistributedLogNamespace namespace;
     protected String streamName;
     protected DistributedLogManager dlm;
 
@@ -69,7 +71,7 @@ public class DistributedLogInputFormat
         dlUri = URI.create(configuration.get(DL_URI, ""));
         streamName = configuration.get(DL_STREAM, "");
         try {
-            namespace = BKDistributedLogNamespace.newBuilder()
+            namespace = DistributedLogNamespaceBuilder.newBuilder()
                     .conf(dlConf)
                     .uri(dlUri)
                     .build();
@@ -89,7 +91,7 @@ public class DistributedLogInputFormat
             throws IOException, InterruptedException {
         List<LogSegmentMetadata> segments = dlm.getLogSegments();
         List<InputSplit> inputSplits = Lists.newArrayListWithCapacity(segments.size());
-        BookKeeper bk = namespace.getReaderBKC().get();
+        BookKeeper bk = ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC().get();
         LedgerManager lm = BookKeeperAccessor.getLedgerManager(bk);
         final AtomicInteger rcHolder = new AtomicInteger(0);
         final AtomicReference<LedgerMetadata> metadataHolder = new AtomicReference<LedgerMetadata>(null);
@@ -121,7 +123,7 @@ public class DistributedLogInputFormat
         return new LogSegmentReader(
                 streamName,
                 dlConf,
-                namespace.getReaderBKC().get(),
+                ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC().get(),
                 (LogSegmentSplit) inputSplit);
     }
 }


Mime
View raw message