distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [08/20] incubator-distributedlog git commit: DL-102: Add routing service to write proxy server side
Date Wed, 28 Dec 2016 01:05:16 GMT
DL-102: Add routing service to write proxy server side

this change is to add getOwner rpc in write proxy. so we can change the client side to get
owner from write proxy first for routing service. in this way, we can start experiementing
any resource placement algorithms.


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/16d73c35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/16d73c35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/16d73c35

Branch: refs/heads/master
Commit: 16d73c35ee45b99594bc7c840dc028337b7de859
Parents: 9ee7d01
Author: Leigh Stewart <lstewart@apache.org>
Authored: Thu Jul 28 21:36:49 2016 -0700
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Tue Dec 27 16:49:27 2016 -0800

----------------------------------------------------------------------
 .../routing/SingleHostRoutingService.java       |  9 ++-
 .../proxy/MockDistributedLogServices.java       |  5 ++
 .../distributedlog/BKDistributedLogManager.java |  1 +
 .../src/main/thrift/service.thrift              |  7 +-
 .../service/DistributedLogCluster.java          | 40 ++++++++++--
 .../service/DistributedLogServer.java           | 20 +++++-
 .../service/DistributedLogServerApp.java        | 18 +++++-
 .../service/DistributedLogServiceImpl.java      | 68 ++++++++++++++++++++
 .../distributedlog/service/ResponseUtils.java   |  4 ++
 .../client/routing/LocalRoutingService.java     |  3 +-
 .../service/DistributedLogServerTestCase.java   |  7 +-
 .../service/TestDistributedLogService.java      | 49 ++++++++++++++
 12 files changed, 214 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java
b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java
index 15356ff..e526868 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java
@@ -29,9 +29,8 @@ import java.util.concurrent.CopyOnWriteArraySet;
 /**
  * Single Host Routing Service.
  */
-class SingleHostRoutingService implements RoutingService {
+public class SingleHostRoutingService implements RoutingService {
 
-    @Deprecated
     public static SingleHostRoutingService of(SocketAddress address) {
         return new SingleHostRoutingService(address);
     }
@@ -71,7 +70,7 @@ class SingleHostRoutingService implements RoutingService {
         }
     }
 
-    private final SocketAddress address;
+    private SocketAddress address;
     private final CopyOnWriteArraySet<RoutingListener> listeners =
             new CopyOnWriteArraySet<RoutingListener>();
 
@@ -79,6 +78,10 @@ class SingleHostRoutingService implements RoutingService {
         this.address = address;
     }
 
+    public void setAddress(SocketAddress address) {
+        this.address = address;
+    }
+
     @Override
     public Set<SocketAddress> getHosts() {
         return Sets.newHashSet(address);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
index 13ba044..f088c0d 100644
--- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
+++ b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
@@ -105,6 +105,11 @@ public class MockDistributedLogServices {
         }
 
         @Override
+        public Future<WriteResponse> getOwner(String stream, WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
         public Future<Void> setAcceptNewStream(boolean enabled) {
             return Future.value(null);
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index 75a5b83..ae8ae12 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -75,6 +75,7 @@ import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZKUtil;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-protocol/src/main/thrift/service.thrift
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/thrift/service.thrift b/distributedlog-protocol/src/main/thrift/service.thrift
index 4c0eaf1..a25af63 100644
--- a/distributedlog-protocol/src/main/thrift/service.thrift
+++ b/distributedlog-protocol/src/main/thrift/service.thrift
@@ -94,7 +94,7 @@ enum StatusCode {
     CHECKSUM_FAILED = 523,
     /* Overcapacity: too many streams */
     TOO_MANY_STREAMS = 524,
-    // Log Segment Not Found
+    /* Log Segment Not Found */
     LOG_SEGMENT_NOT_FOUND = 525,
 
     /* 6xx: unexpected */
@@ -167,14 +167,17 @@ struct ClientInfo {
 
 service DistributedLogService {
 
+    /* Deprecated */
     ServerInfo handshake();
 
     ServerInfo handshakeWithClientInfo(ClientInfo clientInfo);
 
+    /* Deprecated */
     WriteResponse heartbeat(string stream, WriteContext ctx);
 
     WriteResponse heartbeatWithOptions(string stream, WriteContext ctx, HeartbeatOptions
options);
 
+    /* Deprecated */
     WriteResponse write(string stream, binary data);
 
     WriteResponse writeWithContext(string stream, binary data, WriteContext ctx);
@@ -189,6 +192,8 @@ service DistributedLogService {
 
     WriteResponse delete(string stream, WriteContext ctx);
 
+    WriteResponse getOwner(string stream, WriteContext ctx);
+
     /* Admin Methods */
     void setAcceptNewStream(bool enabled);
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
index 3e7948d..0ce335b 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
@@ -19,6 +19,7 @@ package com.twitter.distributedlog.service;
 
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.LocalDLMEmulator;
+import com.twitter.distributedlog.client.routing.SingleHostRoutingService;
 import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.DLMetadata;
 import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
@@ -63,6 +64,7 @@ public class DistributedLogCluster {
         int _zkPort = 0;
         boolean _shouldStartProxy = true;
         int _proxyPort = 7000;
+        boolean _thriftmux = false;
         DistributedLogConfiguration _dlConf = new DistributedLogConfiguration()
                 .setLockTimeout(10)
                 .setOutputBufferSize(0)
@@ -165,6 +167,17 @@ public class DistributedLogCluster {
             return this;
         }
 
+        /**
+         * Enable thriftmux for the dl server
+         *
+         * @param enabled flag to enable thriftmux
+         * @return builder
+         */
+        public Builder thriftmux(boolean enabled) {
+            this._thriftmux = enabled;
+            return this;
+        }
+
         public DistributedLogCluster build() throws Exception {
             // build the cluster
             return new DistributedLogCluster(
@@ -175,7 +188,8 @@ public class DistributedLogCluster {
                     _zkHost,
                     _zkPort,
                     _shouldStartProxy,
-                    _proxyPort);
+                    _proxyPort,
+                    _thriftmux);
         }
     }
 
@@ -189,8 +203,12 @@ public class DistributedLogCluster {
 
         public final InetSocketAddress address;
         public final Pair<DistributedLogServiceImpl, Server> dlServer;
+        private final SingleHostRoutingService routingService = SingleHostRoutingService.of(null);
 
-        protected DLServer(DistributedLogConfiguration dlConf, URI uri, int basePort) throws
Exception {
+        protected DLServer(DistributedLogConfiguration dlConf,
+                           URI uri,
+                           int basePort,
+                           boolean thriftmux) throws Exception {
             proxyPort = basePort;
 
             boolean success = false;
@@ -207,8 +225,12 @@ public class DistributedLogCluster {
                             dlConf,
                             uri,
                             new IdentityStreamPartitionConverter(),
+                            routingService,
                             new NullStatsProvider(),
-                            proxyPort);
+                            proxyPort,
+                            thriftmux);
+                    routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort));
+                    routingService.startService();
                     success = true;
                 } catch (BindException be) {
                     retries++;
@@ -234,6 +256,7 @@ public class DistributedLogCluster {
 
         public void shutdown() {
             DistributedLogServer.closeServer(dlServer, 0, TimeUnit.MILLISECONDS);
+            routingService.stopService();
         }
     }
 
@@ -243,6 +266,7 @@ public class DistributedLogCluster {
     private DLServer dlServer;
     private final boolean shouldStartProxy;
     private final int proxyPort;
+    private final boolean thriftmux;
     private final List<File> tmpDirs = new ArrayList<File>();
 
     private DistributedLogCluster(DistributedLogConfiguration dlConf,
@@ -252,7 +276,8 @@ public class DistributedLogCluster {
                                   String zkServers,
                                   int zkPort,
                                   boolean shouldStartProxy,
-                                  int proxyPort) throws Exception {
+                                  int proxyPort,
+                                  boolean thriftmux) throws Exception {
         this.dlConf = dlConf;
         if (shouldStartZK) {
             File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog");
@@ -276,6 +301,7 @@ public class DistributedLogCluster {
                 .build();
         this.shouldStartProxy = shouldStartProxy;
         this.proxyPort = proxyPort;
+        this.thriftmux = thriftmux;
     }
 
     public void start() throws Exception {
@@ -283,7 +309,11 @@ public class DistributedLogCluster {
         BKDLConfig bkdlConfig = new BKDLConfig(this.dlmEmulator.getZkServers(), "/ledgers").setACLRootPath(".acl");
         DLMetadata.create(bkdlConfig).update(this.dlmEmulator.getUri());
         if (shouldStartProxy) {
-            this.dlServer = new DLServer(dlConf, this.dlmEmulator.getUri(), proxyPort);
+            this.dlServer = new DLServer(
+                    dlConf,
+                    this.dlmEmulator.getUri(),
+                    proxyPort,
+                    thriftmux);
         } else {
             this.dlServer = null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
index 6ef99b8..185ea82 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
@@ -20,6 +20,7 @@ package com.twitter.distributedlog.service;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.client.routing.RoutingService;
 import com.twitter.distributedlog.config.DynamicConfigurationFactory;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.service.announcer.Announcer;
@@ -73,6 +74,7 @@ public class DistributedLogServer {
 
     private DistributedLogServiceImpl dlService = null;
     private Server server = null;
+    private RoutingService routingService;
     private StatsProvider statsProvider;
     private Announcer announcer = null;
     private ScheduledExecutorService configExecutorService;
@@ -97,6 +99,7 @@ public class DistributedLogServer {
                          Optional<Integer> shardId,
                          Optional<Boolean> announceServerSet,
                          Optional<Boolean> thriftmux,
+                         RoutingService routingService,
                          StatsReceiver statsReceiver,
                          StatsProvider statsProvider) {
         this.uri = uri;
@@ -107,6 +110,7 @@ public class DistributedLogServer {
         this.shardId = shardId;
         this.announceServerSet = announceServerSet;
         this.thriftmux = thriftmux;
+        this.routingService = routingService;
         this.statsReceiver = statsReceiver;
         this.statsProvider = statsProvider;
     }
@@ -183,6 +187,7 @@ public class DistributedLogServer {
                 dynDlConf,
                 dlUri,
                 converter,
+                routingService,
                 statsProvider,
                 port.or(0),
                 keepAliveLatch,
@@ -195,6 +200,9 @@ public class DistributedLogServer {
 
         // announce the service
         announcer.announce();
+        // start the routing service after announced
+        routingService.startService();
+        logger.info("Started the routing service.");
     }
 
     protected void preRun(DistributedLogConfiguration conf, ServerConfiguration serverConf)
{
@@ -245,19 +253,22 @@ public class DistributedLogServer {
             DistributedLogConfiguration dlConf,
             URI dlUri,
             StreamPartitionConverter converter,
+            RoutingService routingService,
             StatsProvider provider,
-            int port) throws IOException {
+            int port,
+            boolean thriftmux) throws IOException {
 
         return runServer(serverConf,
                 dlConf,
                 ConfUtils.getConstDynConf(dlConf),
                 dlUri,
                 converter,
+                routingService,
                 provider,
                 port,
                 new CountDownLatch(0),
                 new NullStatsReceiver(),
-                false,
+                thriftmux,
                 new NullStreamConfigProvider());
     }
 
@@ -267,6 +278,7 @@ public class DistributedLogServer {
             DynamicDistributedLogConfiguration dynDlConf,
             URI dlUri,
             StreamPartitionConverter partitionConverter,
+            RoutingService routingService,
             StatsProvider provider,
             int port,
             CountDownLatch keepAliveLatch,
@@ -291,6 +303,7 @@ public class DistributedLogServer {
                 streamConfProvider,
                 dlUri,
                 partitionConverter,
+                routingService,
                 provider.getStatsLogger(""),
                 perStreamStatsLogger,
                 keepAliveLatch);
@@ -358,6 +371,7 @@ public class DistributedLogServer {
             announcer.close();
         }
         closeServer(Pair.of(dlService, server), gracefulShutdownMs, TimeUnit.MILLISECONDS);
+        routingService.stopService();
         if (null != statsProvider) {
             statsProvider.stop();
         }
@@ -396,6 +410,7 @@ public class DistributedLogServer {
                Optional<Integer> shardId,
                Optional<Boolean> announceServerSet,
                Optional<Boolean> thriftmux,
+               RoutingService routingService,
                StatsReceiver statsReceiver,
                StatsProvider statsProvider)
             throws ConfigurationException, IllegalArgumentException, IOException {
@@ -409,6 +424,7 @@ public class DistributedLogServer {
                 shardId,
                 announceServerSet,
                 thriftmux,
+                routingService,
                 statsReceiver,
                 statsProvider);
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
index a339261..af36307 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
@@ -19,7 +19,11 @@ package com.twitter.distributedlog.service;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.client.routing.RoutingUtils;
+import com.twitter.distributedlog.client.serverset.DLZkServerSet;
 import com.twitter.finagle.stats.NullStatsReceiver;
 import com.twitter.finagle.stats.StatsReceiver;
 import org.apache.bookkeeper.stats.NullStatsProvider;
@@ -38,7 +42,9 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
+import java.net.URI;
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
 import static com.twitter.distributedlog.util.CommandLineUtils.*;
 
@@ -119,8 +125,17 @@ public class DistributedLogServerApp {
                     }
                 }).or(new NullStatsProvider());
 
+        final Optional<String> uriOption = getOptionalStringArg(cmdline, "u");
+        Preconditions.checkArgument(uriOption.isPresent(), "No distributedlog uri provided.");
+        URI dlUri = URI.create(uriOption.get());
+
+        DLZkServerSet serverSet = DLZkServerSet.of(dlUri, (int) TimeUnit.SECONDS.toMillis(60));
+        RoutingService routingService = RoutingUtils.buildRoutingService(serverSet.getServerSet())
+                .statsReceiver(statsReceiver.scope("routing"))
+                .build();
+
         final DistributedLogServer server = DistributedLogServer.runServer(
-                getOptionalStringArg(cmdline, "u"),
+                uriOption,
                 confOptional,
                 getOptionalStringArg(cmdline, "sc"),
                 getOptionalIntegerArg(cmdline, "p"),
@@ -128,6 +143,7 @@ public class DistributedLogServerApp {
                 getOptionalIntegerArg(cmdline, "si"),
                 getOptionalBooleanArg(cmdline, "a"),
                 getOptionalBooleanArg(cmdline, "mx"),
+                routingService,
                 statsReceiver,
                 statsProvider);
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index f529927..12820d9 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -24,6 +24,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.acl.AccessControlManager;
+import com.twitter.distributedlog.client.resolver.RegionResolver;
+import com.twitter.distributedlog.client.resolver.TwitterRegionResolver;
+import com.twitter.distributedlog.client.routing.RoutingService;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.RegionUnavailableException;
 import com.twitter.distributedlog.exceptions.ServiceUnavailableException;
@@ -67,6 +70,7 @@ import com.twitter.distributedlog.rate.MovingAverageRate;
 import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.SchedulerUtils;
+import com.twitter.finagle.NoBrokersAvailableException;
 import com.twitter.util.Await;
 import com.twitter.util.Duration;
 import com.twitter.util.Function0;
@@ -85,6 +89,8 @@ import org.slf4j.LoggerFactory;
 import scala.runtime.BoxedUnit;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -116,6 +122,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
     private final StreamConfigProvider streamConfigProvider;
     private final StreamManager streamManager;
     private final StreamFactory streamFactory;
+    private final RoutingService routingService;
+    private final RegionResolver regionResolver;
     private final MovingAverageRateFactory movingAvgFactory;
     private final MovingAverageRate windowedRps;
     private final MovingAverageRate windowedBps;
@@ -149,6 +157,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
                               StreamConfigProvider streamConfigProvider,
                               URI uri,
                               StreamPartitionConverter converter,
+                              RoutingService routingService,
                               StatsLogger statsLogger,
                               StatsLogger perStreamStatsLogger,
                               CountDownLatch keepAliveLatch)
@@ -224,6 +233,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
                 converter,
                 streamConfigProvider,
                 dlNamespace);
+        this.routingService = routingService;
+        this.regionResolver = new TwitterRegionResolver();
 
         // Service features
         this.featureRegionStopAcceptNewStream = this.featureProvider.getFeature(
@@ -467,6 +478,53 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
         return op.result();
     }
 
+    //
+    // Ownership RPC
+    //
+
+    @Override
+    public Future<WriteResponse> getOwner(String streamName, WriteContext ctx) {
+        if (streamManager.isAcquired(streamName)) {
+            // the stream is already acquired
+            return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(clientId)));
+        }
+
+        Stream stream = streamManager.getStream(streamName);
+        String owner;
+        if (null != stream && null != (owner = stream.getOwner())) {
+            return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(owner)));
+        }
+
+        RoutingService.RoutingContext routingContext = RoutingService.RoutingContext.of(regionResolver);
+
+        if (ctx.isSetTriedHosts()) {
+            for (String triedHost : ctx.getTriedHosts()) {
+                routingContext.addTriedHost(
+                        DLSocketAddress.parseSocketAddress(triedHost), StatusCode.STREAM_UNAVAILABLE);
+            }
+        }
+
+        try {
+            SocketAddress host = routingService.getHost(streamName, routingContext);
+            if (host instanceof InetSocketAddress) {
+                // use shard id '-1' as the shard id here won't be used for redirection
+                return Future.value(new WriteResponse(
+                        ResponseUtils.ownerToHeader(DLSocketAddress.toLockId((InetSocketAddress)
host, -1))));
+            } else {
+                return Future.value(new WriteResponse(
+                        ResponseUtils.streamUnavailableHeader()));
+            }
+        } catch (NoBrokersAvailableException e) {
+            return Future.value(new WriteResponse(
+                    ResponseUtils.streamUnavailableHeader()));
+        }
+    }
+
+
+    //
+    // Admin RPCs
+    //
+
     @Override
     public Future<Void> setAcceptNewStream(boolean enabled) {
         closeLock.writeLock().lock();
@@ -656,6 +714,16 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
         return newWriteOp(stream, data, checksum, false);
     }
 
+    @VisibleForTesting
+    RoutingService getRoutingService() {
+        return this.routingService;
+    }
+
+    @VisibleForTesting
+    DLSocketAddress getServiceAddress() throws IOException {
+        return DLSocketAddress.deserialize(clientId);
+    }
+
     WriteOp newWriteOp(String stream,
                        ByteBuffer data,
                        Long checksum,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java
index 0bceec5..cee9dba 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java
@@ -32,6 +32,10 @@ public class ResponseUtils {
         return new ResponseHeader(StatusCode.REQUEST_DENIED);
     }
 
+    public static ResponseHeader streamUnavailableHeader() {
+        return new ResponseHeader(StatusCode.STREAM_UNAVAILABLE);
+    }
+
     public static ResponseHeader successHeader() {
         return new ResponseHeader(StatusCode.SUCCESS);
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
index 475755b..10941ba 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
@@ -92,7 +92,7 @@ public class LocalRoutingService implements RoutingService {
         return this;
     }
 
-    public void addHost(String stream, SocketAddress address) {
+    public LocalRoutingService addHost(String stream, SocketAddress address) {
         boolean notify = false;
         synchronized (this) {
             LinkedHashSet<SocketAddress> addresses = localAddresses.get(stream);
@@ -109,6 +109,7 @@ public class LocalRoutingService implements RoutingService {
                 listener.onServerJoin(address);
             }
         }
+        return this;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
index 486a106..c37248c 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
@@ -34,7 +34,6 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -149,7 +148,7 @@ public abstract class DistributedLogServerTestCase {
     public void setupNoAdHocCluster() throws Exception {
         noAdHocCluster = createCluster(noAdHocConf);
         noAdHocCluster.start();
-        noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 7002);
+        noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 7002, false);
         noAdHocClient = createDistributedLogClient("no-ad-hoc-client");
     }
 
@@ -193,12 +192,12 @@ public abstract class DistributedLogServerTestCase {
     }
 
     protected DLServer createDistributedLogServer(int port) throws Exception {
-        return new DLServer(conf, dlCluster.getUri(), port);
+        return new DLServer(conf, dlCluster.getUri(), port, false);
     }
 
     protected DLServer createDistributedLogServer(DistributedLogConfiguration conf, int port)
             throws Exception {
-        return new DLServer(conf, dlCluster.getUri(), port);
+        return new DLServer(conf, dlCluster.getUri(), port, false);
     }
 
     protected DLClient createDistributedLogClient(String clientName) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index 4195ed3..d7a0ba6 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -23,6 +23,7 @@ import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.util.ProtocolUtils;
 import com.twitter.distributedlog.TestDistributedLogBase;
 import com.twitter.distributedlog.acl.DefaultAccessControlManager;
+import com.twitter.distributedlog.client.routing.LocalRoutingService;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.exceptions.StreamUnavailableException;
 import com.twitter.distributedlog.service.config.NullStreamConfigProvider;
@@ -145,6 +146,7 @@ public class TestDistributedLogService extends TestDistributedLogBase
{
                 new NullStreamConfigProvider(),
                 uri,
                 converter,
+                new LocalRoutingService(),
                 NullStatsLogger.INSTANCE,
                 NullStatsLogger.INSTANCE,
                 latch);
@@ -769,4 +771,51 @@ public class TestDistributedLogService extends TestDistributedLogBase
{
                 streamManager.getAcquiredStreams().isEmpty());
     }
 
+    @Test(timeout = 60000)
+    public void testGetOwner() throws Exception {
+        ((LocalRoutingService) service.getRoutingService())
+                .addHost("stream-0", service.getServiceAddress().getSocketAddress())
+                .setAllowRetrySameHost(false);
+
+        // routing service doesn't know 'stream-1'
+        WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext()));
+        assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode());
+
+        // service cache "stream-2" but not acquire
+        StreamImpl stream = (StreamImpl) service.getStreamManager().getOrCreateStream("stream-2",
false);
+        response = FutureUtils.result(service.getOwner("stream-2", new WriteContext()));
+        assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode());
+
+        // create write ops to stream-2 to make service acquire the stream
+        WriteOp op = createWriteOp(service, "stream-2", 0L);
+        stream.submit(op);
+        stream.start();
+        WriteResponse wr = Await.result(op.result());
+        assertEquals("Op  should succeed",
+                StatusCode.SUCCESS, wr.getHeader().getCode());
+        assertEquals("Service should acquire stream",
+                StreamStatus.INITIALIZED, stream.getStatus());
+        assertNotNull(stream.getManager());
+        assertNotNull(stream.getWriter());
+        assertNull(stream.getLastException());
+
+        // the stream is acquired
+        response = FutureUtils.result(service.getOwner("stream-2", new WriteContext()));
+        assertEquals(StatusCode.FOUND, response.getHeader().getCode());
+        assertEquals(service.getServiceAddress().toString(),
+                response.getHeader().getLocation());
+
+        // find the stream from the routing service
+        response = FutureUtils.result(service.getOwner("stream-0", new WriteContext()));
+        assertEquals(StatusCode.FOUND, response.getHeader().getCode());
+        assertEquals(service.getServiceAddress().toString(),
+                response.getHeader().getLocation());
+
+        // add the tried host
+        WriteContext ctx = new WriteContext();
+        ctx.addToTriedHosts(DLSocketAddress.toString(service.getServiceAddress().getSocketAddress()));
+        response = FutureUtils.result(service.getOwner("stream-0", ctx));
+        assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode());
+    }
+
 }


Mime
View raw message