distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [10/20] incubator-distributedlog git commit: DL-103: DL client can use server-side routing service to retrieve ownerships for streams
Date Wed, 28 Dec 2016 01:05:18 GMT
DL-103: DL client can use server-side routing service to retrieve ownerships for streams

If the dl client configures a routing service finagle name, the client will use this to retrieve ownerships for streams.


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/ddbd7716
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/ddbd7716
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/ddbd7716

Branch: refs/heads/master
Commit: ddbd7716a895f060a6764624f65ffa45420167b8
Parents: 16d73c3
Author: Sijie Guo <sijieg@twitter.com>
Authored: Thu Jul 28 22:02:09 2016 -0700
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Tue Dec 27 16:49:27 2016 -0800

----------------------------------------------------------------------
 .../client/DistributedLogClientImpl.java        | 115 ++-
 .../client/proxy/ClusterClient.java             |  51 ++
 .../service/DistributedLogClientBuilder.java    |  85 ++-
 .../service/DistributedLogServiceImpl.java      |   4 +-
 .../service/DistributedLogServerTestCase.java   |  46 +-
 .../service/TestDistributedLogServer.java       | 730 -------------------
 .../service/TestDistributedLogServerBase.java   | 714 ++++++++++++++++++
 .../TestDistributedLogServerClientRouting.java  |  59 ++
 .../TestDistributedLogServerServerRouting.java  |  28 +
 .../service/TestRegionUnavailable.java          |   1 +
 .../service/balancer/TestClusterBalancer.java   |   3 +-
 .../service/balancer/TestSimpleBalancer.java    |   6 +-
 .../service/balancer/TestStreamMover.java       |   7 +-
 13 files changed, 1085 insertions(+), 764 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
index d3edb74..5125f28 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
@@ -19,13 +19,14 @@ package com.twitter.distributedlog.client;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.LogRecordSetBuffer;
-import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
 import com.twitter.distributedlog.client.ownership.OwnershipCache;
+import com.twitter.distributedlog.client.proxy.ClusterClient;
 import com.twitter.distributedlog.client.proxy.HostProvider;
 import com.twitter.distributedlog.client.proxy.ProxyClient;
 import com.twitter.distributedlog.client.proxy.ProxyClientManager;
@@ -38,6 +39,7 @@ import com.twitter.distributedlog.client.stats.OpStats;
 import com.twitter.distributedlog.exceptions.DLClientClosedException;
 import com.twitter.distributedlog.exceptions.DLException;
 import com.twitter.distributedlog.exceptions.ServiceUnavailableException;
+import com.twitter.distributedlog.exceptions.StreamUnavailableException;
 import com.twitter.distributedlog.service.DLSocketAddress;
 import com.twitter.distributedlog.service.DistributedLogClient;
 import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
@@ -66,6 +68,8 @@ import com.twitter.util.Function0;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
+import com.twitter.util.Return;
+import com.twitter.util.Throw;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -118,6 +122,8 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
     private final OwnershipCache ownershipCache;
     // Channel/Client management
     private final ProxyClientManager clientManager;
+    // Cluster Client (for routing service)
+    private final Optional<ClusterClient> clusterClient;
 
     // Close Status
     private boolean closed = false;
@@ -140,6 +146,16 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
             this.opStats = opStats;
         }
 
+        boolean shouldTimeout() {
+            long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+            return shouldTimeout(elapsedMs);
+        }
+
+        boolean shouldTimeout(long elapsedMs) {
+            return clientConfig.getRequestTimeoutMs() > 0
+                && elapsedMs >= clientConfig.getRequestTimeoutMs();
+        }
+
         void send(SocketAddress address) {
             long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
             if (clientConfig.getMaxRedirects() > 0
@@ -147,8 +163,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
                 fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs),
                         "Exhausted max redirects in " + elapsedMs + " ms"));
                 return;
-            } else if (clientConfig.getRequestTimeoutMs() > 0
-                && elapsedMs >= clientConfig.getRequestTimeoutMs()) {
+            } else if (shouldTimeout(elapsedMs)) {
                 fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs),
                         "Exhausted max request timeout " + clientConfig.getRequestTimeoutMs()
                                 + " in " + elapsedMs + " ms"));
@@ -225,6 +240,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
             // to go so large for other reasons though.
             this.results = new ArrayList<Promise<DLSN>>(data.size());
             for (int i = 0; i < data.size(); i++) {
+                Preconditions.checkNotNull(data.get(i));
                 this.results.add(new Promise<DLSN>());
             }
         }
@@ -549,6 +565,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
                                     RoutingService routingService,
                                     ClientBuilder clientBuilder,
                                     ClientConfig clientConfig,
+                                    Optional<ClusterClient> clusterClient,
                                     StatsReceiver statsReceiver,
                                     StatsReceiver streamStatsReceiver,
                                     RegionResolver regionResolver,
@@ -579,6 +596,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
                 this.dlTimer,       // timer
                 this,               // host provider
                 clientStats);       // client stats
+        this.clusterClient = clusterClient;
         this.clientManager.registerProxyListener(this);
 
         // Cache Stats
@@ -614,8 +632,10 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
     @Override
     public Set<SocketAddress> getHosts() {
         Set<SocketAddress> hosts = Sets.newHashSet();
-        // use both routing service and ownership cache for the handshaking source
-        hosts.addAll(this.routingService.getHosts());
+        // if using server side routing, we only handshake with the hosts in ownership cache.
+        if (!clusterClient.isPresent()) {
+            hosts.addAll(this.routingService.getHosts());
+        }
         hosts.addAll(this.ownershipCache.getStreamOwnershipDistribution().keySet());
         return hosts;
     }
@@ -675,7 +695,12 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
 
     @Override
     public void onServerJoin(SocketAddress address) {
-        clientManager.createClient(address);
+        // we only pre-create connection for client-side routing
+        // if it is server side routing, we only know the exact proxy address
+        // when #getOwner.
+        if (!clusterClient.isPresent()) {
+            clientManager.createClient(address);
+        }
     }
 
     public void close() {
@@ -807,17 +832,77 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
             op.routingContext.addTriedHost(previousAddr, StatusCode.WRITE_EXCEPTION);
         }
         // Get host first
-        SocketAddress address = ownershipCache.getOwner(op.stream);
+        final SocketAddress address = ownershipCache.getOwner(op.stream);
         if (null == address || op.routingContext.isTriedHost(address)) {
-            // pickup host by hashing
-            try {
-                address = routingService.getHost(op.stream, op.routingContext);
-            } catch (NoBrokersAvailableException nbae) {
-                op.fail(null, nbae);
-                return;
-            }
+            getOwner(op).addEventListener(new FutureEventListener<SocketAddress>() {
+                @Override
+                public void onFailure(Throwable cause) {
+                    op.fail(null, cause);
+                }
+
+                @Override
+                public void onSuccess(SocketAddress ownerAddr) {
+                    op.send(ownerAddr);
+                }
+            });
+        } else {
+            op.send(address);
+        }
+    }
+
+    private void retryGetOwnerFromRoutingServer(final StreamOp op,
+                                                final Promise<SocketAddress> getOwnerPromise,
+                                                final Throwable cause) {
+        if (op.shouldTimeout()) {
+            op.fail(null, cause);
+            return;
+        }
+        getOwnerFromRoutingServer(op, getOwnerPromise);
+    }
+
+    private void getOwnerFromRoutingServer(final StreamOp op,
+                                           final Promise<SocketAddress> getOwnerPromise) {
+        clusterClient.get().getService().getOwner(op.stream, op.ctx)
+            .addEventListener(new FutureEventListener<WriteResponse>() {
+                @Override
+                public void onFailure(Throwable cause) {
+                    getOwnerPromise.updateIfEmpty(new Throw<SocketAddress>(cause));
+                }
+
+                @Override
+                public void onSuccess(WriteResponse value) {
+                    if (StatusCode.FOUND == value.getHeader().getCode() &&
+                            null != value.getHeader().getLocation()) {
+                        SocketAddress addr;
+                        try {
+                             addr = DLSocketAddress.deserialize(value.getHeader().getLocation()).getSocketAddress();
+                        } catch (IOException e) {
+                            // retry from the routing server again
+                            retryGetOwnerFromRoutingServer(op, getOwnerPromise, e);
+                            return;
+                        }
+                        getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr));
+                    } else {
+                        // retry from the routing server again
+                        retryGetOwnerFromRoutingServer(op, getOwnerPromise,
+                                new StreamUnavailableException("Stream " + op.stream + "'s owner is unknown"));
+                    }
+                }
+            });
+    }
+
+    private Future<SocketAddress> getOwner(final StreamOp op) {
+        if (clusterClient.isPresent()) {
+            final Promise<SocketAddress> getOwnerPromise = new Promise<SocketAddress>();
+            getOwnerFromRoutingServer(op, getOwnerPromise);
+            return getOwnerPromise;
+        }
+        // pickup host by hashing
+        try {
+            return Future.value(routingService.getHost(op.stream, op.routingContext));
+        } catch (NoBrokersAvailableException nbae) {
+            return Future.exception(nbae);
         }
-        op.send(address);
     }
 
     private void sendWriteRequest(final SocketAddress addr, final StreamOp op) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java
new file mode 100644
index 0000000..bb95a97
--- /dev/null
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.client.proxy;
+
+import com.twitter.distributedlog.thrift.service.DistributedLogService;
+import com.twitter.finagle.Service;
+import com.twitter.finagle.thrift.ThriftClientRequest;
+import com.twitter.util.Future;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Cluster client
+ */
+public class ClusterClient {
+
+    private final Service<ThriftClientRequest, byte[]> client;
+    private final DistributedLogService.ServiceIface service;
+
+    public ClusterClient(Service<ThriftClientRequest, byte[]> client,
+                         DistributedLogService.ServiceIface service) {
+        this.client = client;
+        this.service = service;
+    }
+
+    public Service<ThriftClientRequest, byte[]> getClient() {
+        return client;
+    }
+
+    public DistributedLogService.ServiceIface getService() {
+        return service;
+    }
+
+    public Future<BoxedUnit> close() {
+        return client.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
index 48a229b..ec9a7c6 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
@@ -20,35 +20,52 @@ package com.twitter.distributedlog.service;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.twitter.common.zookeeper.ServerSet;
 import com.twitter.distributedlog.client.ClientConfig;
 import com.twitter.distributedlog.client.DistributedLogClientImpl;
 import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
+import com.twitter.distributedlog.client.proxy.ClusterClient;
 import com.twitter.distributedlog.client.resolver.DefaultRegionResolver;
 import com.twitter.distributedlog.client.resolver.RegionResolver;
 import com.twitter.distributedlog.client.routing.RegionsRoutingService;
 import com.twitter.distributedlog.client.routing.RoutingService;
 import com.twitter.distributedlog.client.routing.RoutingUtils;
+import com.twitter.distributedlog.thrift.service.DistributedLogService;
+import com.twitter.finagle.Name;
+import com.twitter.finagle.Resolver$;
+import com.twitter.finagle.Service;
+import com.twitter.finagle.ThriftMux;
 import com.twitter.finagle.builder.ClientBuilder;
 import com.twitter.finagle.stats.NullStatsReceiver;
 import com.twitter.finagle.stats.StatsReceiver;
 import com.twitter.finagle.thrift.ClientId;
+import com.twitter.finagle.thrift.ThriftClientFramedCodec;
+import com.twitter.finagle.thrift.ThriftClientRequest;
+import com.twitter.util.Duration;
 import java.net.SocketAddress;
 import java.net.URI;
 import java.util.Random;
 import org.apache.commons.lang.StringUtils;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
 
 /**
  * Builder to build {@link DistributedLogClient}.
  */
 public final class DistributedLogClientBuilder {
 
+    private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientBuilder.class);
+
     private static final Random random = new Random(System.currentTimeMillis());
 
     private String name = null;
     private ClientId clientId = null;
     private RoutingService.Builder routingServiceBuilder = null;
     private ClientBuilder clientBuilder = null;
+    private String serverRoutingServiceFinagleName = null;
     private StatsReceiver statsReceiver = new NullStatsReceiver();
     private StatsReceiver streamStatsReceiver = new NullStatsReceiver();
     private ClientConfig clientConfig = new ClientConfig();
@@ -478,6 +495,18 @@ public final class DistributedLogClientBuilder {
         return newBuilder;
     }
 
+    /**
+     * Configure the finagle name string for the server-side routing service.
+     *
+     * @param nameStr name string of the server-side routing service
+     * @return client builder
+     */
+    public DistributedLogClientBuilder serverRoutingServiceFinagleNameStr(String nameStr) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.serverRoutingServiceFinagleName = nameStr;
+        return newBuilder;
+    }
+
     DistributedLogClientBuilder clientConfig(ClientConfig clientConfig) {
         DistributedLogClientBuilder newBuilder = newBuilder(this);
         newBuilder.clientConfig = ClientConfig.newConfig(clientConfig);
@@ -499,9 +528,47 @@ public final class DistributedLogClientBuilder {
      * @return monitor service client.
      */
     public MonitorServiceClient buildMonitorClient() {
+
         return buildClient();
     }
 
+    @SuppressWarnings("unchecked")
+    ClusterClient buildServerRoutingServiceClient(String serverRoutingServiceFinagleName) {
+        ClientBuilder builder = _clientBuilder;
+        if (null == builder) {
+            builder = ClientBuilder.get()
+                    .tcpConnectTimeout(Duration.fromMilliseconds(200))
+                    .connectTimeout(Duration.fromMilliseconds(200))
+                    .requestTimeout(Duration.fromSeconds(1))
+                    .retries(20);
+            if (!_clientConfig.getThriftMux()) {
+                builder = builder.hostConnectionLimit(1);
+            }
+        }
+        if (_clientConfig.getThriftMux()) {
+            builder = builder.stack(ThriftMux.client().withClientId(_clientId));
+        } else {
+            builder = builder.codec(ThriftClientFramedCodec.apply(Option.apply(_clientId)));
+        }
+
+        Name name;
+        try {
+            name = Resolver$.MODULE$.eval(serverRoutingServiceFinagleName);
+        } catch (Exception exc) {
+            logger.error("Exception in Resolver.eval for name {}", serverRoutingServiceFinagleName, exc);
+            throw new RuntimeException(exc);
+        }
+
+        // builder the client
+        Service<ThriftClientRequest, byte[]> client =
+                ClientBuilder.safeBuildFactory(
+                        builder.dest(name).reportTo(_statsReceiver.scope("routing"))
+                ).toService();
+        DistributedLogService.ServiceIface service =
+                new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory());
+        return new ClusterClient(client, service);
+    }
+
     DistributedLogClientImpl buildClient() {
         checkNotNull(name, "No name provided.");
         checkNotNull(clientId, "No client id provided.");
@@ -511,13 +578,27 @@ public final class DistributedLogClientBuilder {
             streamStatsReceiver = new NullStatsReceiver();
         }
 
+        Optional<ClusterClient> serverRoutingServiceClient = Optional.absent();
+        if (null != serverRoutingServiceFinagleName) {
+            serverRoutingServiceClient = Optional.of(
+                    buildServerRoutingServiceClient(serverRoutingServiceFinagleName));
+        }
+
         RoutingService routingService = routingServiceBuilder
                 .statsReceiver(statsReceiver.scope("routing"))
                 .build();
         DistributedLogClientImpl clientImpl =
                 new DistributedLogClientImpl(
-                    name, clientId, routingService, clientBuilder, clientConfig,
-                    statsReceiver, streamStatsReceiver, regionResolver, enableRegionStats);
+                        name,
+                        clientId,
+                        routingService,
+                        clientBuilder,
+                        clientConfig,
+                        serverRoutingServiceClient,
+                        statsReceiver,
+                        streamStatsReceiver,
+                        regionResolver,
+                        enableRegionStats);
         routingService.startService();
         clientImpl.handshake();
         return clientImpl;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 12820d9..74da34f 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -24,8 +24,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.acl.AccessControlManager;
+import com.twitter.distributedlog.client.resolver.DefaultRegionResolver;
 import com.twitter.distributedlog.client.resolver.RegionResolver;
-import com.twitter.distributedlog.client.resolver.TwitterRegionResolver;
 import com.twitter.distributedlog.client.routing.RoutingService;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.RegionUnavailableException;
@@ -234,7 +234,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
                 streamConfigProvider,
                 dlNamespace);
         this.routingService = routingService;
-        this.regionResolver = new TwitterRegionResolver();
+        this.regionResolver = new DefaultRegionResolver();
 
         // Service features
         this.featureRegionStopAcceptNewStream = this.featureProvider.getFeature(

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
index c37248c..4d29f21 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog.service;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.client.DistributedLogClientImpl;
@@ -60,14 +61,12 @@ public abstract class DistributedLogServerTestCase {
 
     protected static class DLClient {
         public final LocalRoutingService routingService;
-        public final DistributedLogClientBuilder dlClientBuilder;
+        public DistributedLogClientBuilder dlClientBuilder;
         public final DistributedLogClientImpl dlClient;
 
-        protected DLClient(String name) {
-            this(name, ".*");
-        }
-
-        protected DLClient(String name, String streamNameRegex) {
+        protected DLClient(String name,
+                           String streamNameRegex,
+                           Optional<String> serverSideRoutingFinagleName) {
             routingService = LocalRoutingService.newBuilder().build();
             dlClientBuilder = DistributedLogClientBuilder.newBuilder()
                         .name(name)
@@ -79,6 +78,10 @@ public abstract class DistributedLogServerTestCase {
                             .hostConnectionLimit(1)
                             .connectionTimeout(Duration.fromSeconds(1))
                             .requestTimeout(Duration.fromSeconds(60)));
+            if (serverSideRoutingFinagleName.isPresent()) {
+                dlClientBuilder =
+                        dlClientBuilder.serverRoutingServiceFinagleNameStr(serverSideRoutingFinagleName.get());
+            }
             dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
         }
 
@@ -123,6 +126,7 @@ public abstract class DistributedLogServerTestCase {
         }
     }
 
+    private final boolean clientSideRouting;
     protected DLServer dlServer;
     protected DLClient dlClient;
     protected DLServer noAdHocServer;
@@ -149,7 +153,12 @@ public abstract class DistributedLogServerTestCase {
         noAdHocCluster = createCluster(noAdHocConf);
         noAdHocCluster.start();
         noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 7002, false);
-        noAdHocClient = createDistributedLogClient("no-ad-hoc-client");
+        Optional<String> serverSideRoutingFinagleName = Optional.absent();
+        if (!clientSideRouting) {
+            serverSideRoutingFinagleName =
+                    Optional.of("inet!" + DLSocketAddress.toString(noAdHocServer.getAddress()));
+        }
+        noAdHocClient = createDistributedLogClient("no-ad-hoc-client", serverSideRoutingFinagleName);
     }
 
     public void tearDownNoAdHocCluster() throws Exception {
@@ -175,10 +184,19 @@ public abstract class DistributedLogServerTestCase {
         return dlCluster.getUri();
     }
 
+    protected DistributedLogServerTestCase(boolean clientSideRouting) {
+        this.clientSideRouting = clientSideRouting;
+    }
+
     @Before
     public void setup() throws Exception {
         dlServer = createDistributedLogServer(7001);
-        dlClient = createDistributedLogClient("test");
+        Optional<String> serverSideRoutingFinagleName = Optional.absent();
+        if (!clientSideRouting) {
+            serverSideRoutingFinagleName =
+                    Optional.of("inet!" + DLSocketAddress.toString(dlServer.getAddress()));
+        }
+        dlClient = createDistributedLogClient("test", serverSideRoutingFinagleName);
     }
 
     @After
@@ -200,13 +218,17 @@ public abstract class DistributedLogServerTestCase {
         return new DLServer(conf, dlCluster.getUri(), port, false);
     }
 
-    protected DLClient createDistributedLogClient(String clientName) throws Exception {
-        return createDistributedLogClient(clientName, ".*");
+    protected DLClient createDistributedLogClient(String clientName,
+                                                  Optional<String> serverSideRoutingFinagleName)
+            throws Exception {
+        return createDistributedLogClient(clientName, ".*", serverSideRoutingFinagleName);
     }
 
-    protected DLClient createDistributedLogClient(String clientName, String streamNameRegex)
+    protected DLClient createDistributedLogClient(String clientName,
+                                                  String streamNameRegex,
+                                                  Optional<String> serverSideRoutingFinagleName)
             throws Exception {
-        return new DLClient(clientName, streamNameRegex);
+        return new DLClient(clientName, streamNameRegex, serverSideRoutingFinagleName);
     }
 
     protected TwoRegionDLClient createTwoRegionDLClient(String clientName,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java
deleted file mode 100644
index 63723ef..0000000
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java
+++ /dev/null
@@ -1,730 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.service;
-
-import com.twitter.distributedlog.AsyncLogReader;
-import com.twitter.distributedlog.DLMTestUtil;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.TestZooKeeperClientBuilder;
-import com.twitter.distributedlog.annotations.DistributedLogAnnotations;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.LogReader;
-import com.twitter.distributedlog.LogRecord;
-import com.twitter.distributedlog.LogRecordWithDLSN;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.acl.AccessControlManager;
-import com.twitter.distributedlog.acl.ZKAccessControl;
-import com.twitter.distributedlog.client.DistributedLogClientImpl;
-import com.twitter.distributedlog.client.routing.LocalRoutingService;
-import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.metadata.BKDLConfig;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.service.stream.StreamManagerImpl;
-import com.twitter.distributedlog.thrift.AccessControlEntry;
-import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
-import com.twitter.distributedlog.thrift.service.HeartbeatOptions;
-import com.twitter.distributedlog.thrift.service.StatusCode;
-import com.twitter.distributedlog.thrift.service.WriteContext;
-import com.twitter.distributedlog.util.FailpointUtils;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import com.twitter.util.Futures;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class TestDistributedLogServer extends DistributedLogServerTestCase {
-    static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServer.class);
-
-    @Rule
-    public TestName testName = new TestName();
-
-    /**
-     * {@link https://issues.apache.org/jira/browse/DL-27}
-     */
-    @DistributedLogAnnotations.FlakyTest
-    @Ignore
-    @Test(timeout = 60000)
-    public void testBasicWrite() throws Exception {
-        String name = "dlserver-basic-write";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        for (long i = 1; i <= 10; i++) {
-            logger.debug("Write entry {} to stream {}.", i, name);
-            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())));
-        }
-
-        HeartbeatOptions hbOptions = new HeartbeatOptions();
-        hbOptions.setSendHeartBeatToReader(true);
-        // make sure the first log segment of each stream created
-        FutureUtils.result(dlClient.dlClient.heartbeat(name));
-
-        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
-        LogReader reader = dlm.getInputStream(1);
-        int numRead = 0;
-        LogRecord r = reader.readNext(false);
-        while (null != r) {
-            ++numRead;
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(numRead, i);
-            r = reader.readNext(false);
-        }
-        assertEquals(10, numRead);
-        reader.close();
-        dlm.close();
-    }
-
-    /**
-     * Sanity check to make sure both checksum flag values work.
-     */
-    @Test(timeout = 60000)
-    public void testChecksumFlag() throws Exception {
-        String name = "testChecksumFlag";
-        LocalRoutingService routingService = LocalRoutingService.newBuilder().build();
-        routingService.addHost(name, dlServer.getAddress());
-        DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder()
-            .name(name)
-            .clientId(ClientId$.MODULE$.apply("test"))
-            .routingService(routingService)
-            .handshakeWithClientInfo(true)
-            .clientBuilder(ClientBuilder.get()
-                .hostConnectionLimit(1)
-                .connectionTimeout(Duration.fromSeconds(1))
-                .requestTimeout(Duration.fromSeconds(60)))
-            .checksum(false);
-        DistributedLogClient dlClient = dlClientBuilder.build();
-        Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes())));
-        dlClient.close();
-
-        dlClient = dlClientBuilder.checksum(true).build();
-        Await.result(dlClient.write(name, ByteBuffer.wrap(("2").getBytes())));
-        dlClient.close();
-    }
-
-    private void runSimpleBulkWriteTest(int writeCount) throws Exception {
-        String name = String.format("dlserver-bulk-write-%d", writeCount);
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount);
-        for (long i = 1; i <= writeCount; i++) {
-            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
-        }
-
-        logger.debug("Write {} entries to stream {}.", writeCount, name);
-        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
-        assertEquals(futures.size(), writeCount);
-        for (Future<DLSN> future : futures) {
-            // No throw == pass.
-            DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
-        }
-
-        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
-        LogReader reader = dlm.getInputStream(1);
-        int numRead = 0;
-        LogRecord r = reader.readNext(false);
-        while (null != r) {
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(numRead + 1, i);
-            ++numRead;
-            r = reader.readNext(false);
-        }
-        assertEquals(writeCount, numRead);
-        reader.close();
-        dlm.close();
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWrite() throws Exception {
-        runSimpleBulkWriteTest(100);
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWriteSingleWrite() throws Exception {
-        runSimpleBulkWriteTest(1);
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWriteEmptyList() throws Exception {
-        String name = String.format("dlserver-bulk-write-%d", 0);
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
-        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
-
-        assertEquals(0, futures.size());
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWriteNullArg() throws Exception {
-
-        String name = String.format("dlserver-bulk-write-%s", "null");
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
-        writes.add(null);
-
-        try {
-            List<Future<DLSN>> futureResult = dlClient.dlClient.writeBulk(name, writes);
-            fail("should not have succeeded");
-        } catch (NullPointerException npe) {
-            ; // expected
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWriteEmptyBuffer() throws Exception {
-        String name = String.format("dlserver-bulk-write-%s", "empty");
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
-        writes.add(ByteBuffer.wrap(("").getBytes()));
-        writes.add(ByteBuffer.wrap(("").getBytes()));
-        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
-        assertEquals(2, futures.size());
-        for (Future<DLSN> future : futures) {
-            // No throw == pass
-            DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
-        }
-    }
-
-    void failDueToWrongException(Exception ex) {
-        logger.info("testBulkWritePartialFailure: ", ex);
-        fail(String.format("failed with wrong exception %s", ex.getClass().getName()));
-    }
-
-    int validateAllFailedAsCancelled(List<Future<DLSN>> futures, int start, int finish) {
-        int failed = 0;
-        for (int i = start; i < finish; i++) {
-            Future<DLSN> future = futures.get(i);
-            try {
-                DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
-                fail("future should have failed!");
-            } catch (DLException cre) {
-                ++failed;
-            } catch (Exception ex) {
-                failDueToWrongException(ex);
-            }
-        }
-        return failed;
-    }
-
-    void validateFailedAsLogRecordTooLong(Future<DLSN> future) {
-        try {
-            DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
-            fail("should have failed");
-        } catch (DLException dle) {
-            assertEquals(StatusCode.TOO_LARGE_RECORD, dle.getCode());
-        } catch (Exception ex) {
-            failDueToWrongException(ex);
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWritePartialFailure() throws Exception {
-        String name = String.format("dlserver-bulk-write-%s", "partial-failure");
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        final int writeCount = 100;
-
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount*2 + 1);
-        for (long i = 1; i <= writeCount; i++) {
-            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
-        }
-        // Too big, will cause partial failure.
-        ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
-        writes.add(buf);
-        for (long i = 1; i <= writeCount; i++) {
-            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
-        }
-
-        // Count succeeded.
-        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
-        int succeeded = 0;
-        for (int i = 0; i < writeCount; i++) {
-            Future<DLSN> future = futures.get(i);
-            try {
-                DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
-                ++succeeded;
-            } catch (Exception ex) {
-                failDueToWrongException(ex);
-            }
-        }
-
-        validateFailedAsLogRecordTooLong(futures.get(writeCount));
-        FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1)));
-        assertEquals(writeCount, succeeded);
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWriteTotalFailureFirstWriteFailed() throws Exception {
-        String name = String.format("dlserver-bulk-write-%s", "first-write-failed");
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        final int writeCount = 100;
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
-        ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
-        writes.add(buf);
-        for (long i = 1; i <= writeCount; i++) {
-            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
-        }
-
-        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
-        validateFailedAsLogRecordTooLong(futures.get(0));
-        FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1)));
-    }
-
-    @Test(timeout = 60000)
-    public void testBulkWriteTotalFailureLostLock() throws Exception {
-        String name = String.format("dlserver-bulk-write-%s", "lost-lock");
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        final int writeCount = 8;
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
-        ByteBuffer buf = ByteBuffer.allocate(8);
-        writes.add(buf);
-        for (long i = 1; i <= writeCount; i++) {
-            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
-        }
-        // Warm it up with a write.
-        Await.result(dlClient.dlClient.write(name, ByteBuffer.allocate(8)));
-
-        // Failpoint a lost lock, make sure the failure gets promoted to an operation failure.
-        DistributedLogServiceImpl svcImpl = (DistributedLogServiceImpl) dlServer.dlServer.getLeft();
-        try {
-            FailpointUtils.setFailpoint(
-                FailpointUtils.FailPointName.FP_WriteInternalLostLock,
-                FailpointUtils.FailPointActions.FailPointAction_Default
-            );
-            Future<BulkWriteResponse> futures = svcImpl.writeBulkWithContext(name, writes, new WriteContext());
-            assertEquals(StatusCode.LOCKING_EXCEPTION, Await.result(futures).header.code);
-        } finally {
-            FailpointUtils.removeFailpoint(
-                FailpointUtils.FailPointName.FP_WriteInternalLostLock
-            );
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testHeartbeat() throws Exception {
-        String name = "dlserver-heartbeat";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        for (long i = 1; i <= 10; i++) {
-            logger.debug("Send heartbeat {} to stream {}.", i, name);
-            dlClient.dlClient.check(name).get();
-        }
-
-        logger.debug("Write entry one to stream {}.", name);
-        dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes())).get();
-
-        Thread.sleep(1000);
-
-        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
-        LogReader reader = dlm.getInputStream(DLSN.InitialDLSN);
-        int numRead = 0;
-        // eid=0 => control records
-        // other 9 heartbeats will not trigger writing any control records.
-        // eid=1 => user entry
-        long startEntryId = 1;
-        LogRecordWithDLSN r = reader.readNext(false);
-        while (null != r) {
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(numRead + 1, i);
-            assertEquals(r.getDlsn().compareTo(new DLSN(1, startEntryId, 0)), 0);
-            ++numRead;
-            ++startEntryId;
-            r = reader.readNext(false);
-        }
-        assertEquals(1, numRead);
-    }
-
-    @Test(timeout = 60000)
-    public void testFenceWrite() throws Exception {
-        String name = "dlserver-fence-write";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        for (long i = 1; i <= 10; i++) {
-            logger.debug("Write entry {} to stream {}.", i, name);
-            dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
-        }
-
-        Thread.sleep(1000);
-
-        logger.info("Fencing stream {}.", name);
-        DLMTestUtil.fenceStream(conf, getUri(), name);
-        logger.info("Fenced stream {}.", name);
-
-        for (long i = 11; i <= 20; i++) {
-            logger.debug("Write entry {} to stream {}.", i, name);
-            dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
-        }
-
-        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
-        LogReader reader = dlm.getInputStream(1);
-        int numRead = 0;
-        LogRecord r = reader.readNext(false);
-        while (null != r) {
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(numRead + 1, i);
-            ++numRead;
-            r = reader.readNext(false);
-        }
-        assertEquals(20, numRead);
-        reader.close();
-        dlm.close();
-    }
-
-    @Test(timeout = 60000)
-    public void testDeleteStream() throws Exception {
-        String name = "dlserver-delete-stream";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        long txid = 101;
-        for (long i = 1; i <= 10; i++) {
-            long curTxId = txid++;
-            logger.debug("Write entry {} to stream {}.", curTxId, name);
-            dlClient.dlClient.write(name,
-                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
-        }
-
-        checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
-
-        dlClient.dlClient.delete(name).get();
-
-        checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
-
-        Thread.sleep(1000);
-
-        DistributedLogManager dlm101 = DLMTestUtil.createNewDLM(name, conf, getUri());
-        AsyncLogReader reader101 = FutureUtils.result(dlm101.openAsyncLogReader(DLSN.InitialDLSN));
-        try {
-            FutureUtils.result(reader101.readNext());
-            fail("Should fail with LogNotFoundException since the stream is deleted");
-        } catch (LogNotFoundException lnfe) {
-            // expected
-        }
-        FutureUtils.result(reader101.asyncClose());
-        dlm101.close();
-
-        txid = 201;
-        for (long i = 1; i <= 10; i++) {
-            long curTxId = txid++;
-            logger.debug("Write entry {} to stream {}.", curTxId, name);
-            DLSN dlsn = dlClient.dlClient.write(name,
-                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
-        }
-        Thread.sleep(1000);
-
-        DistributedLogManager dlm201 = DLMTestUtil.createNewDLM(name, conf, getUri());
-        LogReader reader201 = dlm201.getInputStream(1);
-        int numRead = 0;
-        int curTxId = 201;
-        LogRecord r = reader201.readNext(false);
-        while (null != r) {
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(curTxId++, i);
-            ++numRead;
-            r = reader201.readNext(false);
-        }
-        assertEquals(10, numRead);
-        reader201.close();
-        dlm201.close();
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateStream() throws Exception {
-        try {
-            setupNoAdHocCluster();
-            final String name = "dlserver-create-stream";
-
-            noAdHocClient.routingService.addHost("dlserver-create-stream", noAdHocServer.getAddress());
-            assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
-            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
-
-            long txid = 101;
-            for (long i = 1; i <= 10; i++) {
-                long curTxId = txid++;
-                logger.debug("Write entry {} to stream {}.", curTxId, name);
-                noAdHocClient.dlClient.write(name,
-                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
-            }
-
-            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
-        } finally {
-            tearDownNoAdHocCluster();
-        }
-    }
-
-    /** This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing */
-    @Test(timeout = 60000)
-    public void testCreateStreamTwice() throws Exception {
-        try {
-            setupNoAdHocCluster();
-            final String name = "dlserver-create-stream-twice";
-
-            noAdHocClient.routingService.addHost("dlserver-create-stream-twice", noAdHocServer.getAddress());
-            assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
-            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
-
-            long txid = 101;
-            for (long i = 1; i <= 10; i++) {
-                long curTxId = txid++;
-                logger.debug("Write entry {} to stream {}.", curTxId, name);
-                noAdHocClient.dlClient.write(name,
-                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
-            }
-
-            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
-
-            // create again
-            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
-            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
-        } finally {
-            tearDownNoAdHocCluster();
-        }
-    }
-
-
-
-    @Test(timeout = 60000)
-    public void testTruncateStream() throws Exception {
-        String name = "dlserver-truncate-stream";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        long txid = 1;
-        Map<Long, DLSN> txid2DLSN = new HashMap<Long, DLSN>();
-        for (int s = 1; s <= 3; s++) {
-            for (long i = 1; i <= 10; i++) {
-                long curTxId = txid++;
-                logger.debug("Write entry {} to stream {}.", curTxId, name);
-                DLSN dlsn = dlClient.dlClient.write(name,
-                        ByteBuffer.wrap(("" + curTxId).getBytes())).get();
-                txid2DLSN.put(curTxId, dlsn);
-            }
-            if (s <= 2) {
-                dlClient.dlClient.release(name).get();
-            }
-        }
-
-        DLSN dlsnToDelete = txid2DLSN.get(21L);
-        dlClient.dlClient.truncate(name, dlsnToDelete).get();
-
-        DistributedLogManager readDLM = DLMTestUtil.createNewDLM(name, conf, getUri());
-        LogReader reader = readDLM.getInputStream(1);
-        int numRead = 0;
-        int curTxId = 11;
-        LogRecord r = reader.readNext(false);
-        while (null != r) {
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(curTxId++, i);
-            ++numRead;
-            r = reader.readNext(false);
-        }
-        assertEquals(20, numRead);
-        reader.close();
-        readDLM.close();
-    }
-
-    @Test(timeout = 60000)
-    public void testRequestDenied() throws Exception {
-        String name = "request-denied";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        AccessControlEntry ace = new AccessControlEntry();
-        ace.setDenyWrite(true);
-        ZooKeeperClient zkc = TestZooKeeperClientBuilder
-                .newBuilder()
-                .uri(getUri())
-                .connectionTimeoutMs(60000)
-                .sessionTimeoutMs(60000)
-                .build();
-        DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace();
-        BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri());
-        String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name;
-        ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath);
-        accessControl.create(zkc);
-
-        AccessControlManager acm = dlNamespace.createAccessControlManager();
-        while (acm.allowWrite(name)) {
-            Thread.sleep(100);
-        }
-
-        try {
-            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
-            fail("Should fail with request denied exception");
-        } catch (DLException dle) {
-            assertEquals(StatusCode.REQUEST_DENIED, dle.getCode());
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testNoneStreamNameRegex() throws Exception {
-        String streamNamePrefix = "none-stream-name-regex-";
-        int numStreams = 5;
-        Set<String> streams = new HashSet<String>();
-
-        for (int i = 0; i < numStreams; i++) {
-            streams.add(streamNamePrefix + i);
-        }
-        testStreamNameRegex(streams, ".*", streams);
-    }
-
-    @Test(timeout = 60000)
-    public void testStreamNameRegex() throws Exception {
-        String streamNamePrefix = "stream-name-regex-";
-        int numStreams = 5;
-        Set<String> streams = new HashSet<String>();
-        Set<String> expectedStreams = new HashSet<String>();
-        String streamNameRegex = streamNamePrefix + "1";
-
-        for (int i = 0; i < numStreams; i++) {
-            streams.add(streamNamePrefix + i);
-        }
-        expectedStreams.add(streamNamePrefix + "1");
-
-        testStreamNameRegex(streams, streamNameRegex, expectedStreams);
-    }
-
-    private void testStreamNameRegex(Set<String> streams, String streamNameRegex,
-                                     Set<String> expectedStreams)
-            throws Exception {
-        for (String streamName : streams) {
-            dlClient.routingService.addHost(streamName, dlServer.getAddress());
-            Await.result(dlClient.dlClient.write(streamName,
-                    ByteBuffer.wrap(streamName.getBytes(UTF_8))));
-        }
-
-        DLClient client = createDistributedLogClient("test-stream-name-regex", streamNameRegex);
-        try {
-            client.routingService.addHost("unknown", dlServer.getAddress());
-            client.handshake();
-            Map<SocketAddress, Set<String>> distribution =
-                    client.dlClient.getStreamOwnershipDistribution();
-            assertEquals(1, distribution.size());
-            Set<String> cachedStreams = distribution.values().iterator().next();
-            assertNotNull(cachedStreams);
-            assertEquals(expectedStreams.size(), cachedStreams.size());
-
-            for (String streamName : cachedStreams) {
-                assertTrue(expectedStreams.contains(streamName));
-            }
-        } finally {
-            client.shutdown();
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testReleaseStream() throws Exception {
-        String name = "dlserver-release-stream";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-
-        Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
-        checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
-
-        // release the stream
-        Await.result(dlClient.dlClient.release(name));
-        checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
-    }
-
-    @Test(timeout = 60000)
-    public void testAcceptNewStream() throws Exception {
-        String name = "dlserver-accept-new-stream";
-
-        dlClient.routingService.addHost(name, dlServer.getAddress());
-        dlClient.routingService.setAllowRetrySameHost(false);
-
-        Await.result(dlClient.dlClient.setAcceptNewStream(false));
-
-        try {
-            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
-            fail("Should fail because the proxy couldn't accept new stream");
-        } catch (NoBrokersAvailableException nbae) {
-            // expected
-        }
-        checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
-
-        Await.result(dlServer.dlServer.getLeft().setAcceptNewStream(true));
-        Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
-        checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
-    }
-
-    private void checkStream(int expectedNumProxiesInClient, int expectedClientCacheSize, int expectedServerCacheSize,
-                             String name, SocketAddress owner, boolean existedInServer, boolean existedInClient) {
-        Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution();
-        assertEquals(expectedNumProxiesInClient, distribution.size());
-
-        if (expectedNumProxiesInClient > 0) {
-            Map.Entry<SocketAddress, Set<String>> localEntry =
-                    distribution.entrySet().iterator().next();
-            assertEquals(owner, localEntry.getKey());
-            assertEquals(expectedClientCacheSize, localEntry.getValue().size());
-            assertEquals(existedInClient, localEntry.getValue().contains(name));
-        }
-
-
-        StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
-        Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
-        Set<String> acquiredStreams = streamManager.getCachedStreams().keySet();
-
-        assertEquals(expectedServerCacheSize, cachedStreams.size());
-        assertEquals(existedInServer, cachedStreams.contains(name));
-        assertEquals(expectedServerCacheSize, acquiredStreams.size());
-        assertEquals(existedInServer, acquiredStreams.contains(name));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
new file mode 100644
index 0000000..a0853e4
--- /dev/null
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
@@ -0,0 +1,714 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service;
+
+import com.google.common.base.Optional;
+import com.twitter.distributedlog.AsyncLogReader;
+import com.twitter.distributedlog.DLMTestUtil;
+import com.twitter.distributedlog.DLSN;
+import com.twitter.distributedlog.DistributedLogManager;
+import com.twitter.distributedlog.TestZooKeeperClientBuilder;
+import com.twitter.distributedlog.annotations.DistributedLogAnnotations;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.LogReader;
+import com.twitter.distributedlog.LogRecord;
+import com.twitter.distributedlog.LogRecordWithDLSN;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.acl.AccessControlManager;
+import com.twitter.distributedlog.acl.ZKAccessControl;
+import com.twitter.distributedlog.client.routing.LocalRoutingService;
+import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.service.stream.StreamManagerImpl;
+import com.twitter.distributedlog.thrift.AccessControlEntry;
+import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
+import com.twitter.distributedlog.thrift.service.HeartbeatOptions;
+import com.twitter.distributedlog.thrift.service.StatusCode;
+import com.twitter.distributedlog.thrift.service.WriteContext;
+import com.twitter.distributedlog.util.FailpointUtils;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import com.twitter.util.Future;
+import com.twitter.util.Futures;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public abstract class TestDistributedLogServerBase extends DistributedLogServerTestCase {
+    static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    protected TestDistributedLogServerBase(boolean clientSideRouting) {
+        super(clientSideRouting);
+    }
+
+    /**
+     * {@link https://issues.apache.org/jira/browse/DL-27}
+     */
+    @DistributedLogAnnotations.FlakyTest
+    @Ignore
+    @Test(timeout = 60000)
+    public void testBasicWrite() throws Exception {
+        String name = "dlserver-basic-write";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        for (long i = 1; i <= 10; i++) {
+            logger.debug("Write entry {} to stream {}.", i, name);
+            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())));
+        }
+
+        HeartbeatOptions hbOptions = new HeartbeatOptions();
+        hbOptions.setSendHeartBeatToReader(true);
+        // make sure the first log segment of each stream created
+        FutureUtils.result(dlClient.dlClient.heartbeat(name));
+
+        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
+        LogReader reader = dlm.getInputStream(1);
+        int numRead = 0;
+        LogRecord r = reader.readNext(false);
+        while (null != r) {
+            ++numRead;
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(numRead, i);
+            r = reader.readNext(false);
+        }
+        assertEquals(10, numRead);
+        reader.close();
+        dlm.close();
+    }
+
+    /**
+     * Sanity check to make sure both checksum flag values work.
+     */
+    @Test(timeout = 60000)
+    public void testChecksumFlag() throws Exception {
+        String name = "testChecksumFlag";
+        LocalRoutingService routingService = LocalRoutingService.newBuilder().build();
+        routingService.addHost(name, dlServer.getAddress());
+        DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder()
+            .name(name)
+            .clientId(ClientId$.MODULE$.apply("test"))
+            .routingService(routingService)
+            .handshakeWithClientInfo(true)
+            .clientBuilder(ClientBuilder.get()
+                .hostConnectionLimit(1)
+                .connectionTimeout(Duration.fromSeconds(1))
+                .requestTimeout(Duration.fromSeconds(60)))
+            .checksum(false);
+        DistributedLogClient dlClient = dlClientBuilder.build();
+        Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes())));
+        dlClient.close();
+
+        dlClient = dlClientBuilder.checksum(true).build();
+        Await.result(dlClient.write(name, ByteBuffer.wrap(("2").getBytes())));
+        dlClient.close();
+    }
+
+    private void runSimpleBulkWriteTest(int writeCount) throws Exception {
+        String name = String.format("dlserver-bulk-write-%d", writeCount);
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount);
+        for (long i = 1; i <= writeCount; i++) {
+            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+        }
+
+        logger.debug("Write {} entries to stream {}.", writeCount, name);
+        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+        assertEquals(futures.size(), writeCount);
+        for (Future<DLSN> future : futures) {
+            // No throw == pass.
+            DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+        }
+
+        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
+        LogReader reader = dlm.getInputStream(1);
+        int numRead = 0;
+        LogRecord r = reader.readNext(false);
+        while (null != r) {
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(numRead + 1, i);
+            ++numRead;
+            r = reader.readNext(false);
+        }
+        assertEquals(writeCount, numRead);
+        reader.close();
+        dlm.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWrite() throws Exception {
+        runSimpleBulkWriteTest(100);
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWriteSingleWrite() throws Exception {
+        runSimpleBulkWriteTest(1);
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWriteEmptyList() throws Exception {
+        String name = String.format("dlserver-bulk-write-%d", 0);
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
+        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+
+        assertEquals(0, futures.size());
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWriteNullArg() throws Exception {
+
+        String name = String.format("dlserver-bulk-write-%s", "null");
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
+        writes.add(null);
+
+        try {
+            List<Future<DLSN>> futureResult = dlClient.dlClient.writeBulk(name, writes);
+            fail("should not have succeeded");
+        } catch (NullPointerException npe) {
+            ; // expected
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWriteEmptyBuffer() throws Exception {
+        String name = String.format("dlserver-bulk-write-%s", "empty");
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
+        writes.add(ByteBuffer.wrap(("").getBytes()));
+        writes.add(ByteBuffer.wrap(("").getBytes()));
+        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+        assertEquals(2, futures.size());
+        for (Future<DLSN> future : futures) {
+            // No throw == pass
+            DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+        }
+    }
+
+    void failDueToWrongException(Exception ex) {
+        logger.info("testBulkWritePartialFailure: ", ex);
+        fail(String.format("failed with wrong exception %s", ex.getClass().getName()));
+    }
+
+    int validateAllFailedAsCancelled(List<Future<DLSN>> futures, int start, int finish) {
+        int failed = 0;
+        for (int i = start; i < finish; i++) {
+            Future<DLSN> future = futures.get(i);
+            try {
+                DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+                fail("future should have failed!");
+            } catch (DLException cre) {
+                ++failed;
+            } catch (Exception ex) {
+                failDueToWrongException(ex);
+            }
+        }
+        return failed;
+    }
+
+    void validateFailedAsLogRecordTooLong(Future<DLSN> future) {
+        try {
+            DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+            fail("should have failed");
+        } catch (DLException dle) {
+            assertEquals(StatusCode.TOO_LARGE_RECORD, dle.getCode());
+        } catch (Exception ex) {
+            failDueToWrongException(ex);
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWritePartialFailure() throws Exception {
+        String name = String.format("dlserver-bulk-write-%s", "partial-failure");
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        final int writeCount = 100;
+
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount*2 + 1);
+        for (long i = 1; i <= writeCount; i++) {
+            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+        }
+        // Too big, will cause partial failure.
+        ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
+        writes.add(buf);
+        for (long i = 1; i <= writeCount; i++) {
+            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+        }
+
+        // Count succeeded.
+        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+        int succeeded = 0;
+        for (int i = 0; i < writeCount; i++) {
+            Future<DLSN> future = futures.get(i);
+            try {
+                DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+                ++succeeded;
+            } catch (Exception ex) {
+                failDueToWrongException(ex);
+            }
+        }
+
+        validateFailedAsLogRecordTooLong(futures.get(writeCount));
+        FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1)));
+        assertEquals(writeCount, succeeded);
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWriteTotalFailureFirstWriteFailed() throws Exception {
+        String name = String.format("dlserver-bulk-write-%s", "first-write-failed");
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        final int writeCount = 100;
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
+        ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
+        writes.add(buf);
+        for (long i = 1; i <= writeCount; i++) {
+            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+        }
+
+        List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+        validateFailedAsLogRecordTooLong(futures.get(0));
+        FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1)));
+    }
+
+    @Test(timeout = 60000)
+    public void testBulkWriteTotalFailureLostLock() throws Exception {
+        String name = String.format("dlserver-bulk-write-%s", "lost-lock");
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        final int writeCount = 8;
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
+        ByteBuffer buf = ByteBuffer.allocate(8);
+        writes.add(buf);
+        for (long i = 1; i <= writeCount; i++) {
+            writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+        }
+        // Warm it up with a write.
+        Await.result(dlClient.dlClient.write(name, ByteBuffer.allocate(8)));
+
+        // Failpoint a lost lock, make sure the failure gets promoted to an operation failure.
+        DistributedLogServiceImpl svcImpl = (DistributedLogServiceImpl) dlServer.dlServer.getLeft();
+        try {
+            FailpointUtils.setFailpoint(
+                FailpointUtils.FailPointName.FP_WriteInternalLostLock,
+                FailpointUtils.FailPointActions.FailPointAction_Default
+            );
+            Future<BulkWriteResponse> futures = svcImpl.writeBulkWithContext(name, writes, new WriteContext());
+            assertEquals(StatusCode.LOCKING_EXCEPTION, Await.result(futures).header.code);
+        } finally {
+            FailpointUtils.removeFailpoint(
+                FailpointUtils.FailPointName.FP_WriteInternalLostLock
+            );
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testHeartbeat() throws Exception {
+        String name = "dlserver-heartbeat";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        for (long i = 1; i <= 10; i++) {
+            logger.debug("Send heartbeat {} to stream {}.", i, name);
+            dlClient.dlClient.check(name).get();
+        }
+
+        logger.debug("Write entry one to stream {}.", name);
+        dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes())).get();
+
+        Thread.sleep(1000);
+
+        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
+        LogReader reader = dlm.getInputStream(DLSN.InitialDLSN);
+        int numRead = 0;
+        // eid=0 => control records
+        // other 9 heartbeats will not trigger writing any control records.
+        // eid=1 => user entry
+        long startEntryId = 1;
+        LogRecordWithDLSN r = reader.readNext(false);
+        while (null != r) {
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(numRead + 1, i);
+            assertEquals(r.getDlsn().compareTo(new DLSN(1, startEntryId, 0)), 0);
+            ++numRead;
+            ++startEntryId;
+            r = reader.readNext(false);
+        }
+        assertEquals(1, numRead);
+    }
+
+    @Test(timeout = 60000)
+    public void testFenceWrite() throws Exception {
+        String name = "dlserver-fence-write";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        for (long i = 1; i <= 10; i++) {
+            logger.debug("Write entry {} to stream {}.", i, name);
+            dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
+        }
+
+        Thread.sleep(1000);
+
+        logger.info("Fencing stream {}.", name);
+        DLMTestUtil.fenceStream(conf, getUri(), name);
+        logger.info("Fenced stream {}.", name);
+
+        for (long i = 11; i <= 20; i++) {
+            logger.debug("Write entry {} to stream {}.", i, name);
+            dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
+        }
+
+        DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
+        LogReader reader = dlm.getInputStream(1);
+        int numRead = 0;
+        LogRecord r = reader.readNext(false);
+        while (null != r) {
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(numRead + 1, i);
+            ++numRead;
+            r = reader.readNext(false);
+        }
+        assertEquals(20, numRead);
+        reader.close();
+        dlm.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testDeleteStream() throws Exception {
+        String name = "dlserver-delete-stream";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        long txid = 101;
+        for (long i = 1; i <= 10; i++) {
+            long curTxId = txid++;
+            logger.debug("Write entry {} to stream {}.", curTxId, name);
+            dlClient.dlClient.write(name,
+                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+        }
+
+        checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
+
+        dlClient.dlClient.delete(name).get();
+
+        checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
+
+        Thread.sleep(1000);
+
+        DistributedLogManager dlm101 = DLMTestUtil.createNewDLM(name, conf, getUri());
+        AsyncLogReader reader101 = FutureUtils.result(dlm101.openAsyncLogReader(DLSN.InitialDLSN));
+        try {
+            FutureUtils.result(reader101.readNext());
+            fail("Should fail with LogNotFoundException since the stream is deleted");
+        } catch (LogNotFoundException lnfe) {
+            // expected
+        }
+        FutureUtils.result(reader101.asyncClose());
+        dlm101.close();
+
+        txid = 201;
+        for (long i = 1; i <= 10; i++) {
+            long curTxId = txid++;
+            logger.debug("Write entry {} to stream {}.", curTxId, name);
+            DLSN dlsn = dlClient.dlClient.write(name,
+                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+        }
+        Thread.sleep(1000);
+
+        DistributedLogManager dlm201 = DLMTestUtil.createNewDLM(name, conf, getUri());
+        LogReader reader201 = dlm201.getInputStream(1);
+        int numRead = 0;
+        int curTxId = 201;
+        LogRecord r = reader201.readNext(false);
+        while (null != r) {
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(curTxId++, i);
+            ++numRead;
+            r = reader201.readNext(false);
+        }
+        assertEquals(10, numRead);
+        reader201.close();
+        dlm201.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateStream() throws Exception {
+        try {
+            setupNoAdHocCluster();
+            final String name = "dlserver-create-stream";
+
+            noAdHocClient.routingService.addHost("dlserver-create-stream", noAdHocServer.getAddress());
+            assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
+
+            long txid = 101;
+            for (long i = 1; i <= 10; i++) {
+                long curTxId = txid++;
+                logger.debug("Write entry {} to stream {}.", curTxId, name);
+                noAdHocClient.dlClient.write(name,
+                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+            }
+
+            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+        } finally {
+            tearDownNoAdHocCluster();
+        }
+    }
+
+    /** This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing */
+    @Test(timeout = 60000)
+    public void testCreateStreamTwice() throws Exception {
+        try {
+            setupNoAdHocCluster();
+            final String name = "dlserver-create-stream-twice";
+
+            noAdHocClient.routingService.addHost("dlserver-create-stream-twice", noAdHocServer.getAddress());
+            assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
+
+            long txid = 101;
+            for (long i = 1; i <= 10; i++) {
+                long curTxId = txid++;
+                logger.debug("Write entry {} to stream {}.", curTxId, name);
+                noAdHocClient.dlClient.write(name,
+                    ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+            }
+
+            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+
+            // create again
+            assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
+            assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+        } finally {
+            tearDownNoAdHocCluster();
+        }
+    }
+
+
+
+    @Test(timeout = 60000)
+    public void testTruncateStream() throws Exception {
+        String name = "dlserver-truncate-stream";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        long txid = 1;
+        Map<Long, DLSN> txid2DLSN = new HashMap<Long, DLSN>();
+        for (int s = 1; s <= 3; s++) {
+            for (long i = 1; i <= 10; i++) {
+                long curTxId = txid++;
+                logger.debug("Write entry {} to stream {}.", curTxId, name);
+                DLSN dlsn = dlClient.dlClient.write(name,
+                        ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+                txid2DLSN.put(curTxId, dlsn);
+            }
+            if (s <= 2) {
+                dlClient.dlClient.release(name).get();
+            }
+        }
+
+        DLSN dlsnToDelete = txid2DLSN.get(21L);
+        dlClient.dlClient.truncate(name, dlsnToDelete).get();
+
+        DistributedLogManager readDLM = DLMTestUtil.createNewDLM(name, conf, getUri());
+        LogReader reader = readDLM.getInputStream(1);
+        int numRead = 0;
+        int curTxId = 11;
+        LogRecord r = reader.readNext(false);
+        while (null != r) {
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(curTxId++, i);
+            ++numRead;
+            r = reader.readNext(false);
+        }
+        assertEquals(20, numRead);
+        reader.close();
+        readDLM.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testRequestDenied() throws Exception {
+        String name = "request-denied";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        AccessControlEntry ace = new AccessControlEntry();
+        ace.setDenyWrite(true);
+        ZooKeeperClient zkc = TestZooKeeperClientBuilder
+                .newBuilder()
+                .uri(getUri())
+                .connectionTimeoutMs(60000)
+                .sessionTimeoutMs(60000)
+                .build();
+        DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace();
+        BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri());
+        String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name;
+        ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath);
+        accessControl.create(zkc);
+
+        AccessControlManager acm = dlNamespace.createAccessControlManager();
+        while (acm.allowWrite(name)) {
+            Thread.sleep(100);
+        }
+
+        try {
+            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
+            fail("Should fail with request denied exception");
+        } catch (DLException dle) {
+            assertEquals(StatusCode.REQUEST_DENIED, dle.getCode());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testNoneStreamNameRegex() throws Exception {
+        String streamNamePrefix = "none-stream-name-regex-";
+        int numStreams = 5;
+        Set<String> streams = new HashSet<String>();
+
+        for (int i = 0; i < numStreams; i++) {
+            streams.add(streamNamePrefix + i);
+        }
+        testStreamNameRegex(streams, ".*", streams);
+    }
+
+    @Test(timeout = 60000)
+    public void testStreamNameRegex() throws Exception {
+        String streamNamePrefix = "stream-name-regex-";
+        int numStreams = 5;
+        Set<String> streams = new HashSet<String>();
+        Set<String> expectedStreams = new HashSet<String>();
+        String streamNameRegex = streamNamePrefix + "1";
+
+        for (int i = 0; i < numStreams; i++) {
+            streams.add(streamNamePrefix + i);
+        }
+        expectedStreams.add(streamNamePrefix + "1");
+
+        testStreamNameRegex(streams, streamNameRegex, expectedStreams);
+    }
+
+    private void testStreamNameRegex(Set<String> streams, String streamNameRegex,
+                                     Set<String> expectedStreams)
+            throws Exception {
+        for (String streamName : streams) {
+            dlClient.routingService.addHost(streamName, dlServer.getAddress());
+            Await.result(dlClient.dlClient.write(streamName,
+                    ByteBuffer.wrap(streamName.getBytes(UTF_8))));
+        }
+
+        DLClient client = createDistributedLogClient(
+                "test-stream-name-regex",
+                streamNameRegex,
+                Optional.<String>absent());
+        try {
+            client.routingService.addHost("unknown", dlServer.getAddress());
+            client.handshake();
+            Map<SocketAddress, Set<String>> distribution =
+                    client.dlClient.getStreamOwnershipDistribution();
+            assertEquals(1, distribution.size());
+            Set<String> cachedStreams = distribution.values().iterator().next();
+            assertNotNull(cachedStreams);
+            assertEquals(expectedStreams.size(), cachedStreams.size());
+
+            for (String streamName : cachedStreams) {
+                assertTrue(expectedStreams.contains(streamName));
+            }
+        } finally {
+            client.shutdown();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testReleaseStream() throws Exception {
+        String name = "dlserver-release-stream";
+
+        dlClient.routingService.addHost(name, dlServer.getAddress());
+
+        Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
+        checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
+
+        // release the stream
+        Await.result(dlClient.dlClient.release(name));
+        checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
+    }
+
+    protected void checkStream(int expectedNumProxiesInClient, int expectedClientCacheSize, int expectedServerCacheSize,
+                             String name, SocketAddress owner, boolean existedInServer, boolean existedInClient) {
+        Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution();
+        assertEquals(expectedNumProxiesInClient, distribution.size());
+
+        if (expectedNumProxiesInClient > 0) {
+            Map.Entry<SocketAddress, Set<String>> localEntry =
+                    distribution.entrySet().iterator().next();
+            assertEquals(owner, localEntry.getKey());
+            assertEquals(expectedClientCacheSize, localEntry.getValue().size());
+            assertEquals(existedInClient, localEntry.getValue().contains(name));
+        }
+
+
+        StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
+        Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
+        Set<String> acquiredStreams = streamManager.getCachedStreams().keySet();
+
+        assertEquals(expectedServerCacheSize, cachedStreams.size());
+        assertEquals(existedInServer, cachedStreams.contains(name));
+        assertEquals(expectedServerCacheSize, acquiredStreams.size());
+        assertEquals(existedInServer, acquiredStreams.contains(name));
+    }
+
+}



Mime
View raw message