distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [44/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace
Date Thu, 05 Jan 2017 00:51:49 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
deleted file mode 100644
index 3f65aff..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
+++ /dev/null
@@ -1,608 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.service;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.distributedlog.client.ClientConfig;
-import com.twitter.distributedlog.client.DistributedLogClientImpl;
-import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
-import com.twitter.distributedlog.client.proxy.ClusterClient;
-import com.twitter.distributedlog.client.resolver.DefaultRegionResolver;
-import com.twitter.distributedlog.client.resolver.RegionResolver;
-import com.twitter.distributedlog.client.routing.RegionsRoutingService;
-import com.twitter.distributedlog.client.routing.RoutingService;
-import com.twitter.distributedlog.client.routing.RoutingUtils;
-import com.twitter.distributedlog.thrift.service.DistributedLogService;
-import com.twitter.finagle.Name;
-import com.twitter.finagle.Resolver$;
-import com.twitter.finagle.Service;
-import com.twitter.finagle.ThriftMux;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientId;
-import com.twitter.finagle.thrift.ThriftClientFramedCodec;
-import com.twitter.finagle.thrift.ThriftClientRequest;
-import com.twitter.util.Duration;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.Random;
-import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-
-/**
- * Builder to build {@link DistributedLogClient}.
- */
-public final class DistributedLogClientBuilder {
-
-    private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientBuilder.class);
-
-    private static final Random random = new Random(System.currentTimeMillis());
-
-    private String name = null;
-    private ClientId clientId = null;
-    private RoutingService.Builder routingServiceBuilder = null;
-    private ClientBuilder clientBuilder = null;
-    private String serverRoutingServiceFinagleName = null;
-    private StatsReceiver statsReceiver = new NullStatsReceiver();
-    private StatsReceiver streamStatsReceiver = new NullStatsReceiver();
-    private ClientConfig clientConfig = new ClientConfig();
-    private boolean enableRegionStats = false;
-    private final RegionResolver regionResolver = new DefaultRegionResolver();
-
-    /**
-     * Create a client builder.
-     *
-     * @return client builder
-     */
-    public static DistributedLogClientBuilder newBuilder() {
-        return new DistributedLogClientBuilder();
-    }
-
-    /**
-     * Create a new client builder from an existing {@code builder}.
-     *
-     * @param builder the existing builder.
-     * @return a new client builder.
-     */
-    public static DistributedLogClientBuilder newBuilder(DistributedLogClientBuilder builder) {
-        DistributedLogClientBuilder newBuilder = new DistributedLogClientBuilder();
-        newBuilder.name = builder.name;
-        newBuilder.clientId = builder.clientId;
-        newBuilder.clientBuilder = builder.clientBuilder;
-        newBuilder.routingServiceBuilder = builder.routingServiceBuilder;
-        newBuilder.statsReceiver = builder.statsReceiver;
-        newBuilder.streamStatsReceiver = builder.streamStatsReceiver;
-        newBuilder.enableRegionStats = builder.enableRegionStats;
-        newBuilder.serverRoutingServiceFinagleName = builder.serverRoutingServiceFinagleName;
-        newBuilder.clientConfig = ClientConfig.newConfig(builder.clientConfig);
-        return newBuilder;
-    }
-
-    // private constructor
-    private DistributedLogClientBuilder() {}
-
-    /**
-     * Client Name.
-     *
-     * @param name
-     *          client name
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder name(String name) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.name = name;
-        return newBuilder;
-    }
-
-    /**
-     * Client ID.
-     *
-     * @param clientId
-     *          client id
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder clientId(ClientId clientId) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientId = clientId;
-        return newBuilder;
-    }
-
-    /**
-     * Serverset to access proxy services.
-     *
-     * @param serverSet
-     *          server set.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder serverSet(ServerSet serverSet) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(serverSet);
-        newBuilder.enableRegionStats = false;
-        return newBuilder;
-    }
-
-    /**
-     * Server Sets to access proxy services.
-     *
-     * <p>The <i>local</i> server set will be tried first then <i>remotes</i>.
-     *
-     * @param local local server set.
-     * @param remotes remote server sets.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder serverSets(ServerSet local, ServerSet...remotes) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1];
-        builders[0] = RoutingUtils.buildRoutingService(local);
-        for (int i = 1; i < builders.length; i++) {
-            builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]);
-        }
-        newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder()
-                .resolver(regionResolver)
-                .routingServiceBuilders(builders);
-        newBuilder.enableRegionStats = remotes.length > 0;
-        return newBuilder;
-    }
-
-    /**
-     * Name to access proxy services.
-     *
-     * @param finagleNameStr
-     *          finagle name string.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder finagleNameStr(String finagleNameStr) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr);
-        newBuilder.enableRegionStats = false;
-        return newBuilder;
-    }
-
-    /**
-     * Finagle name strs to access proxy services.
-     *
-     * <p>The <i>local</i> finalge name str will be tried first, then <i>remotes</i>.
-     *
-     * @param local local server set.
-     * @param remotes remote server sets.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder finagleNameStrs(String local, String...remotes) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1];
-        builders[0] = RoutingUtils.buildRoutingService(local);
-        for (int i = 1; i < builders.length; i++) {
-            builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]);
-        }
-        newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder()
-                .routingServiceBuilders(builders)
-                .resolver(regionResolver);
-        newBuilder.enableRegionStats = remotes.length > 0;
-        return newBuilder;
-    }
-
-    /**
-     * URI to access proxy services.
-     *
-     * <p>Assuming the write proxies are announced under `.write_proxy` of the provided namespace uri.
-     * The builder will convert the dl uri (e.g. distributedlog://{zkserver}/path/to/namespace) to
-     * zookeeper serverset based finagle name str (`zk!{zkserver}!/path/to/namespace/.write_proxy`)
-     *
-     * @param uri namespace uri to access the serverset of write proxies
-     * @return distributedlog builder
-     */
-    public DistributedLogClientBuilder uri(URI uri) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        String zkServers = uri.getAuthority().replace(";", ",");
-        String[] zkServerList = StringUtils.split(zkServers, ',');
-        String finagleNameStr = String.format(
-                "zk!%s!%s/.write_proxy",
-                zkServerList[random.nextInt(zkServerList.length)], // zk server
-                uri.getPath());
-        newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr);
-        newBuilder.enableRegionStats = false;
-        return newBuilder;
-    }
-
-    /**
-     * Address of write proxy to connect.
-     *
-     * @param address
-     *          write proxy address.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder host(SocketAddress address) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(address);
-        newBuilder.enableRegionStats = false;
-        return newBuilder;
-    }
-
-    private DistributedLogClientBuilder routingServiceBuilder(RoutingService.Builder builder) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.routingServiceBuilder = builder;
-        newBuilder.enableRegionStats = false;
-        return newBuilder;
-    }
-
-    /**
-     * Routing Service to access proxy services.
-     *
-     * @param routingService
-     *          routing service
-     * @return client builder.
-     */
-    @VisibleForTesting
-    public DistributedLogClientBuilder routingService(RoutingService routingService) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(routingService);
-        newBuilder.enableRegionStats = false;
-        return newBuilder;
-    }
-
-    /**
-     * Stats receiver to expose client stats.
-     *
-     * @param statsReceiver
-     *          stats receiver.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder statsReceiver(StatsReceiver statsReceiver) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.statsReceiver = statsReceiver;
-        return newBuilder;
-    }
-
-    /**
-     * Stream Stats Receiver to expose per stream stats.
-     *
-     * @param streamStatsReceiver
-     *          stream stats receiver
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder streamStatsReceiver(StatsReceiver streamStatsReceiver) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.streamStatsReceiver = streamStatsReceiver;
-        return newBuilder;
-    }
-
-    /**
-     * Set underlying finagle client builder.
-     *
-     * @param builder
-     *          finagle client builder.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder clientBuilder(ClientBuilder builder) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientBuilder = builder;
-        return newBuilder;
-    }
-
-    /**
-     * Backoff time when redirecting to an already retried host.
-     *
-     * @param ms
-     *          backoff time.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder redirectBackoffStartMs(int ms) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setRedirectBackoffStartMs(ms);
-        return newBuilder;
-    }
-
-    /**
-     * Max backoff time when redirecting to an already retried host.
-     *
-     * @param ms
-     *          backoff time.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder redirectBackoffMaxMs(int ms) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setRedirectBackoffMaxMs(ms);
-        return newBuilder;
-    }
-
-    /**
-     * Max redirects that is allowed per request.
-     *
-     * <p>If <i>redirects</i> are exhausted, fail the request immediately.
-     *
-     * @param redirects
-     *          max redirects allowed before failing a request.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder maxRedirects(int redirects) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setMaxRedirects(redirects);
-        return newBuilder;
-    }
-
-    /**
-     * Timeout per request in millis.
-     *
-     * @param timeoutMs
-     *          timeout per request in millis.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder requestTimeoutMs(int timeoutMs) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setRequestTimeoutMs(timeoutMs);
-        return newBuilder;
-    }
-
-    /**
-     * Set thriftmux enabled.
-     *
-     * @param enabled
-     *          is thriftmux enabled
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder thriftmux(boolean enabled) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setThriftMux(enabled);
-        return newBuilder;
-    }
-
-    /**
-     * Set failfast stream exception handling enabled.
-     *
-     * @param enabled
-     *          is failfast exception handling enabled
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder streamFailfast(boolean enabled) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setStreamFailfast(enabled);
-        return newBuilder;
-    }
-
-    /**
-     * Set the regex to match stream names that the client cares about.
-     *
-     * @param nameRegex
-     *          stream name regex
-     * @return client builder
-     */
-    public DistributedLogClientBuilder streamNameRegex(String nameRegex) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setStreamNameRegex(nameRegex);
-        return newBuilder;
-    }
-
-    /**
-     * Whether to use the new handshake endpoint to exchange ownership cache.
-     *
-     * <p>Enable this when the servers are updated to support handshaking with client info.
-     *
-     * @param enabled
-     *          new handshake endpoint is enabled.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder handshakeWithClientInfo(boolean enabled) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setHandshakeWithClientInfo(enabled);
-        return newBuilder;
-    }
-
-    /**
-     * Set the periodic handshake interval in milliseconds.
-     *
-     * <p>Every <code>intervalMs</code>, the DL client will handshake with existing proxies again.
-     * If the interval is less than ownership sync interval, the handshake won't sync ownerships. Otherwise, it will.
-     *
-     * @see #periodicOwnershipSyncIntervalMs(long)
-     * @param intervalMs
-     *          handshake interval
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder periodicHandshakeIntervalMs(long intervalMs) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setPeriodicHandshakeIntervalMs(intervalMs);
-        return newBuilder;
-    }
-
-    /**
-     * Set the periodic ownership sync interval in milliseconds.
-     *
-     * <p>If periodic handshake is enabled, the handshake will sync ownership if the elapsed time is larger than
-     * sync interval.
-     *
-     * @see #periodicHandshakeIntervalMs(long)
-     * @param intervalMs
-     *          interval that handshake should sync ownerships.
-     * @return client builder
-     */
-    public DistributedLogClientBuilder periodicOwnershipSyncIntervalMs(long intervalMs) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setPeriodicOwnershipSyncIntervalMs(intervalMs);
-        return newBuilder;
-    }
-
-    /**
-     * Enable/Disable periodic dumping ownership cache.
-     *
-     * @param enabled
-     *          flag to enable/disable periodic dumping ownership cache
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder periodicDumpOwnershipCache(boolean enabled) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setPeriodicDumpOwnershipCacheEnabled(enabled);
-        return newBuilder;
-    }
-
-    /**
-     * Set periodic dumping ownership cache interval.
-     *
-     * @param intervalMs
-     *          interval on dumping ownership cache, in millis.
-     * @return client builder
-     */
-    public DistributedLogClientBuilder periodicDumpOwnershipCacheIntervalMs(long intervalMs) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setPeriodicDumpOwnershipCacheIntervalMs(intervalMs);
-        return newBuilder;
-    }
-
-    /**
-     * Enable handshake tracing.
-     *
-     * @param enabled
-     *          flag to enable/disable handshake tracing
-     * @return client builder
-     */
-    public DistributedLogClientBuilder handshakeTracing(boolean enabled) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setHandshakeTracingEnabled(enabled);
-        return newBuilder;
-    }
-
-    /**
-     * Enable checksum on requests to the proxy.
-     *
-     * @param enabled
-     *          flag to enable/disable checksum
-     * @return client builder
-     */
-    public DistributedLogClientBuilder checksum(boolean enabled) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setChecksumEnabled(enabled);
-        return newBuilder;
-    }
-
-    /**
-     * Configure the finagle name string for the server-side routing service.
-     *
-     * @param nameStr name string of the server-side routing service
-     * @return client builder
-     */
-    public DistributedLogClientBuilder serverRoutingServiceFinagleNameStr(String nameStr) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.serverRoutingServiceFinagleName = nameStr;
-        return newBuilder;
-    }
-
-    DistributedLogClientBuilder clientConfig(ClientConfig clientConfig) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig = ClientConfig.newConfig(clientConfig);
-        return newBuilder;
-    }
-
-    /**
-     * Build distributedlog client.
-     *
-     * @return distributedlog client.
-     */
-    public DistributedLogClient build() {
-        return buildClient();
-    }
-
-    /**
-     * Build monitor service client.
-     *
-     * @return monitor service client.
-     */
-    public MonitorServiceClient buildMonitorClient() {
-
-        return buildClient();
-    }
-
-    @SuppressWarnings("unchecked")
-    ClusterClient buildServerRoutingServiceClient(String serverRoutingServiceFinagleName) {
-        ClientBuilder builder = this.clientBuilder;
-        if (null == builder) {
-            builder = ClientBuilder.get()
-                    .tcpConnectTimeout(Duration.fromMilliseconds(200))
-                    .connectTimeout(Duration.fromMilliseconds(200))
-                    .requestTimeout(Duration.fromSeconds(1))
-                    .retries(20);
-            if (!clientConfig.getThriftMux()) {
-                builder = builder.hostConnectionLimit(1);
-            }
-        }
-        if (clientConfig.getThriftMux()) {
-            builder = builder.stack(ThriftMux.client().withClientId(clientId));
-        } else {
-            builder = builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId)));
-        }
-
-        Name name;
-        try {
-            name = Resolver$.MODULE$.eval(serverRoutingServiceFinagleName);
-        } catch (Exception exc) {
-            logger.error("Exception in Resolver.eval for name {}", serverRoutingServiceFinagleName, exc);
-            throw new RuntimeException(exc);
-        }
-
-        // builder the client
-        Service<ThriftClientRequest, byte[]> client =
-                ClientBuilder.safeBuildFactory(
-                        builder.dest(name).reportTo(statsReceiver.scope("routing"))
-                ).toService();
-        DistributedLogService.ServiceIface service =
-                new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory());
-        return new ClusterClient(client, service);
-    }
-
-    DistributedLogClientImpl buildClient() {
-        checkNotNull(name, "No name provided.");
-        checkNotNull(clientId, "No client id provided.");
-        checkNotNull(routingServiceBuilder, "No routing service builder provided.");
-        checkNotNull(statsReceiver, "No stats receiver provided.");
-        if (null == streamStatsReceiver) {
-            streamStatsReceiver = new NullStatsReceiver();
-        }
-
-        Optional<ClusterClient> serverRoutingServiceClient = Optional.absent();
-        if (null != serverRoutingServiceFinagleName) {
-            serverRoutingServiceClient = Optional.of(
-                    buildServerRoutingServiceClient(serverRoutingServiceFinagleName));
-        }
-
-        RoutingService routingService = routingServiceBuilder
-                .statsReceiver(statsReceiver.scope("routing"))
-                .build();
-        DistributedLogClientImpl clientImpl =
-                new DistributedLogClientImpl(
-                        name,
-                        clientId,
-                        routingService,
-                        clientBuilder,
-                        clientConfig,
-                        serverRoutingServiceClient,
-                        statsReceiver,
-                        streamStatsReceiver,
-                        regionResolver,
-                        enableRegionStats);
-        routingService.startService();
-        clientImpl.handshake();
-        return clientImpl;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/service/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/package-info.java
deleted file mode 100644
index e6e56c0..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * DistributedLog Service Client.
- */
-package com.twitter.distributedlog.service;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java
new file mode 100644
index 0000000..57e2b5a
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client Config.
+ */
+public class ClientConfig {
+    int redirectBackoffStartMs = 25;
+    int redirectBackoffMaxMs = 100;
+    int maxRedirects = -1;
+    int requestTimeoutMs = -1;
+    boolean thriftmux = false;
+    boolean streamFailfast = false;
+    String streamNameRegex = ".*";
+    boolean handshakeWithClientInfo = true;
+    long periodicHandshakeIntervalMs = TimeUnit.MINUTES.toMillis(5);
+    long periodicOwnershipSyncIntervalMs = TimeUnit.MINUTES.toMillis(5);
+    boolean periodicDumpOwnershipCacheEnabled = false;
+    long periodicDumpOwnershipCacheIntervalMs = TimeUnit.MINUTES.toMillis(10);
+    boolean enableHandshakeTracing = false;
+    boolean enableChecksum = true;
+
+    public ClientConfig setMaxRedirects(int maxRedirects) {
+        this.maxRedirects = maxRedirects;
+        return this;
+    }
+
+    public int getMaxRedirects() {
+        return this.maxRedirects;
+    }
+
+    public ClientConfig setRequestTimeoutMs(int timeoutInMillis) {
+        this.requestTimeoutMs = timeoutInMillis;
+        return this;
+    }
+
+    public int getRequestTimeoutMs() {
+        return this.requestTimeoutMs;
+    }
+
+    public ClientConfig setRedirectBackoffStartMs(int ms) {
+        this.redirectBackoffStartMs = ms;
+        return this;
+    }
+
+    public int getRedirectBackoffStartMs() {
+        return this.redirectBackoffStartMs;
+    }
+
+    public ClientConfig setRedirectBackoffMaxMs(int ms) {
+        this.redirectBackoffMaxMs = ms;
+        return this;
+    }
+
+    public int getRedirectBackoffMaxMs() {
+        return this.redirectBackoffMaxMs;
+    }
+
+    public ClientConfig setThriftMux(boolean enabled) {
+        this.thriftmux = enabled;
+        return this;
+    }
+
+    public boolean getThriftMux() {
+        return this.thriftmux;
+    }
+
+    public ClientConfig setStreamFailfast(boolean enabled) {
+        this.streamFailfast = enabled;
+        return this;
+    }
+
+    public boolean getStreamFailfast() {
+        return this.streamFailfast;
+    }
+
+    public ClientConfig setStreamNameRegex(String nameRegex) {
+        checkNotNull(nameRegex);
+        this.streamNameRegex = nameRegex;
+        return this;
+    }
+
+    public String getStreamNameRegex() {
+        return this.streamNameRegex;
+    }
+
+    public ClientConfig setHandshakeWithClientInfo(boolean enabled) {
+        this.handshakeWithClientInfo = enabled;
+        return this;
+    }
+
+    public boolean getHandshakeWithClientInfo() {
+        return this.handshakeWithClientInfo;
+    }
+
+    public ClientConfig setPeriodicHandshakeIntervalMs(long intervalMs) {
+        this.periodicHandshakeIntervalMs = intervalMs;
+        return this;
+    }
+
+    public long getPeriodicHandshakeIntervalMs() {
+        return this.periodicHandshakeIntervalMs;
+    }
+
+    public ClientConfig setPeriodicOwnershipSyncIntervalMs(long intervalMs) {
+        this.periodicOwnershipSyncIntervalMs = intervalMs;
+        return this;
+    }
+
+    public long getPeriodicOwnershipSyncIntervalMs() {
+        return this.periodicOwnershipSyncIntervalMs;
+    }
+
+    public ClientConfig setPeriodicDumpOwnershipCacheEnabled(boolean enabled) {
+        this.periodicDumpOwnershipCacheEnabled = enabled;
+        return this;
+    }
+
+    public boolean isPeriodicDumpOwnershipCacheEnabled() {
+        return this.periodicDumpOwnershipCacheEnabled;
+    }
+
+    public ClientConfig setPeriodicDumpOwnershipCacheIntervalMs(long intervalMs) {
+        this.periodicDumpOwnershipCacheIntervalMs = intervalMs;
+        return this;
+    }
+
+    public long getPeriodicDumpOwnershipCacheIntervalMs() {
+        return this.periodicDumpOwnershipCacheIntervalMs;
+    }
+
+    public ClientConfig setHandshakeTracingEnabled(boolean enabled) {
+        this.enableHandshakeTracing = enabled;
+        return this;
+    }
+
+    public boolean isHandshakeTracingEnabled() {
+        return this.enableHandshakeTracing;
+    }
+
+    public ClientConfig setChecksumEnabled(boolean enabled) {
+        this.enableChecksum = enabled;
+        return this;
+    }
+
+    public boolean isChecksumEnabled() {
+        return this.enableChecksum;
+    }
+
+    public static ClientConfig newConfig(ClientConfig config) {
+        ClientConfig newConfig = new ClientConfig();
+        newConfig.setMaxRedirects(config.getMaxRedirects())
+                 .setRequestTimeoutMs(config.getRequestTimeoutMs())
+                 .setRedirectBackoffStartMs(config.getRedirectBackoffStartMs())
+                 .setRedirectBackoffMaxMs(config.getRedirectBackoffMaxMs())
+                 .setThriftMux(config.getThriftMux())
+                 .setStreamFailfast(config.getStreamFailfast())
+                 .setStreamNameRegex(config.getStreamNameRegex())
+                 .setHandshakeWithClientInfo(config.getHandshakeWithClientInfo())
+                 .setPeriodicHandshakeIntervalMs(config.getPeriodicHandshakeIntervalMs())
+                 .setPeriodicDumpOwnershipCacheEnabled(config.isPeriodicDumpOwnershipCacheEnabled())
+                 .setPeriodicDumpOwnershipCacheIntervalMs(config.getPeriodicDumpOwnershipCacheIntervalMs())
+                 .setHandshakeTracingEnabled(config.isHandshakeTracingEnabled())
+                 .setChecksumEnabled(config.isChecksumEnabled());
+        return newConfig;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
new file mode 100644
index 0000000..1300187
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
@@ -0,0 +1,1200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecordSetBuffer;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.client.ownership.OwnershipCache;
+import org.apache.distributedlog.client.proxy.ClusterClient;
+import org.apache.distributedlog.client.proxy.HostProvider;
+import org.apache.distributedlog.client.proxy.ProxyClient;
+import org.apache.distributedlog.client.proxy.ProxyClientManager;
+import org.apache.distributedlog.client.proxy.ProxyListener;
+import org.apache.distributedlog.client.resolver.RegionResolver;
+import org.apache.distributedlog.client.routing.RoutingService;
+import org.apache.distributedlog.client.routing.RoutingService.RoutingContext;
+import org.apache.distributedlog.client.stats.ClientStats;
+import org.apache.distributedlog.client.stats.OpStats;
+import org.apache.distributedlog.exceptions.DLClientClosedException;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.ServiceUnavailableException;
+import org.apache.distributedlog.exceptions.StreamUnavailableException;
+import org.apache.distributedlog.service.DLSocketAddress;
+import org.apache.distributedlog.service.DistributedLogClient;
+import org.apache.distributedlog.thrift.service.BulkWriteResponse;
+import org.apache.distributedlog.thrift.service.HeartbeatOptions;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import org.apache.distributedlog.thrift.service.ServerStatus;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteContext;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.ProtocolUtils;
+import com.twitter.finagle.CancelledRequestException;
+import com.twitter.finagle.ConnectionFailedException;
+import com.twitter.finagle.Failure;
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.finagle.RequestTimeoutException;
+import com.twitter.finagle.ServiceException;
+import com.twitter.finagle.ServiceTimeoutException;
+import com.twitter.finagle.WriteException;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.finagle.thrift.ClientId;
+import com.twitter.util.Duration;
+import com.twitter.util.Function;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.Return;
+import com.twitter.util.Throw;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.thrift.TApplicationException;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.Seq;
+import scala.runtime.AbstractFunction1;
+
+
+/**
+ * Implementation of distributedlog client.
+ */
+public class DistributedLogClientImpl implements DistributedLogClient, MonitorServiceClient,
+        RoutingService.RoutingListener, ProxyListener, HostProvider {
+
+    private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientImpl.class);
+
+    private final String clientName;
+    private final ClientId clientId;
+    private final ClientConfig clientConfig;
+    private final RoutingService routingService;
+    private final ProxyClient.Builder clientBuilder;
+    private final boolean streamFailfast;
+    private final Pattern streamNameRegexPattern;
+
+    // Timer
+    private final HashedWheelTimer dlTimer;
+
+    // region resolver
+    private final RegionResolver regionResolver;
+
+    // Ownership maintenance
+    private final OwnershipCache ownershipCache;
+    // Channel/Client management
+    private final ProxyClientManager clientManager;
+    // Cluster Client (for routing service)
+    private final Optional<ClusterClient> clusterClient;
+
+    // Close Status
+    private boolean closed = false;
+    private final ReentrantReadWriteLock closeLock =
+            new ReentrantReadWriteLock();
+
+    abstract class StreamOp implements TimerTask {
+        final String stream;
+
+        final AtomicInteger tries = new AtomicInteger(0);
+        final RoutingContext routingContext = RoutingContext.of(regionResolver);
+        final WriteContext ctx = new WriteContext();
+        final Stopwatch stopwatch;
+        final OpStats opStats;
+        SocketAddress nextAddressToSend;
+
+        StreamOp(final String stream, final OpStats opStats) {
+            this.stream = stream;
+            this.stopwatch = Stopwatch.createStarted();
+            this.opStats = opStats;
+        }
+
+        boolean shouldTimeout() {
+            long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+            return shouldTimeout(elapsedMs);
+        }
+
+        boolean shouldTimeout(long elapsedMs) {
+            return clientConfig.getRequestTimeoutMs() > 0
+                && elapsedMs >= clientConfig.getRequestTimeoutMs();
+        }
+
+        void send(SocketAddress address) {
+            long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+            if (clientConfig.getMaxRedirects() > 0
+                && tries.get() >= clientConfig.getMaxRedirects()) {
+                fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs),
+                        "Exhausted max redirects in " + elapsedMs + " ms"));
+                return;
+            } else if (shouldTimeout(elapsedMs)) {
+                fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs),
+                        "Exhausted max request timeout " + clientConfig.getRequestTimeoutMs()
+                                + " in " + elapsedMs + " ms"));
+                return;
+            }
+            synchronized (this) {
+                String addrStr = address.toString();
+                if (ctx.isSetTriedHosts() && ctx.getTriedHosts().contains(addrStr)) {
+                    nextAddressToSend = address;
+                    dlTimer.newTimeout(this,
+                            Math.min(clientConfig.getRedirectBackoffMaxMs(),
+                                    tries.get() * clientConfig.getRedirectBackoffStartMs()),
+                            TimeUnit.MILLISECONDS);
+                } else {
+                    doSend(address);
+                }
+            }
+        }
+
+        abstract Future<ResponseHeader> sendRequest(ProxyClient sc);
+
+        void doSend(SocketAddress address) {
+            ctx.addToTriedHosts(address.toString());
+            if (clientConfig.isChecksumEnabled()) {
+                Long crc32 = computeChecksum();
+                if (null != crc32) {
+                    ctx.setCrc32(crc32);
+                }
+            }
+            tries.incrementAndGet();
+            sendWriteRequest(address, this);
+        }
+
+        void beforeComplete(ProxyClient sc, ResponseHeader responseHeader) {
+            ownershipCache.updateOwner(stream, sc.getAddress());
+        }
+
+        void complete(SocketAddress address) {
+            stopwatch.stop();
+            opStats.completeRequest(address,
+                    stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get());
+        }
+
+        void fail(SocketAddress address, Throwable t) {
+            stopwatch.stop();
+            opStats.failRequest(address,
+                    stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get());
+        }
+
+        Long computeChecksum() {
+            return null;
+        }
+
+        @Override
+        public synchronized void run(Timeout timeout) throws Exception {
+            if (!timeout.isCancelled() && null != nextAddressToSend) {
+                doSend(nextAddressToSend);
+            } else {
+                fail(null, new CancelledRequestException());
+            }
+        }
+    }
+
+    class BulkWriteOp extends StreamOp {
+
+        final List<ByteBuffer> data;
+        final ArrayList<Promise<DLSN>> results;
+
+        BulkWriteOp(final String name, final List<ByteBuffer> data) {
+            super(name, clientStats.getOpStats("bulk_write"));
+            this.data = data;
+
+            // This could take a while (relatively speaking) for very large inputs. We probably don't want
+            // to go so large for other reasons though.
+            this.results = new ArrayList<Promise<DLSN>>(data.size());
+            for (int i = 0; i < data.size(); i++) {
+                checkNotNull(data.get(i));
+                this.results.add(new Promise<DLSN>());
+            }
+        }
+
+        @Override
+        Future<ResponseHeader> sendRequest(final ProxyClient sc) {
+            return sc.getService().writeBulkWithContext(stream, data, ctx)
+                .addEventListener(new FutureEventListener<BulkWriteResponse>() {
+                @Override
+                public void onSuccess(BulkWriteResponse response) {
+                    // For non-success case, the ResponseHeader handler (the caller) will handle it.
+                    // Note success in this case means no finagle errors have occurred
+                    // (such as finagle connection issues). In general code != SUCCESS means there's some error
+                    // reported by dlog service. The caller will handle such errors.
+                    if (response.getHeader().getCode() == StatusCode.SUCCESS) {
+                        beforeComplete(sc, response.getHeader());
+                        BulkWriteOp.this.complete(sc.getAddress(), response);
+                        if (response.getWriteResponses().size() == 0 && data.size() > 0) {
+                            logger.error("non-empty bulk write got back empty response without failure for stream {}",
+                                stream);
+                        }
+                    }
+                }
+                @Override
+                public void onFailure(Throwable cause) {
+                    // Handled by the ResponseHeader listener (attached by the caller).
+                }
+            }).map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() {
+                @Override
+                public ResponseHeader apply(BulkWriteResponse response) {
+                    // We need to return the ResponseHeader to the caller's listener to process DLOG errors.
+                    return response.getHeader();
+                }
+            });
+        }
+
+        void complete(SocketAddress address, BulkWriteResponse bulkWriteResponse) {
+            super.complete(address);
+            Iterator<WriteResponse> writeResponseIterator = bulkWriteResponse.getWriteResponses().iterator();
+            Iterator<Promise<DLSN>> resultIterator = results.iterator();
+
+            // Fill in errors from thrift responses.
+            while (resultIterator.hasNext() && writeResponseIterator.hasNext()) {
+                Promise<DLSN> result = resultIterator.next();
+                WriteResponse writeResponse = writeResponseIterator.next();
+                if (StatusCode.SUCCESS == writeResponse.getHeader().getCode()) {
+                    result.setValue(DLSN.deserialize(writeResponse.getDlsn()));
+                } else {
+                    result.setException(DLException.of(writeResponse.getHeader()));
+                }
+            }
+
+            // Should never happen, but just in case so there's some record.
+            if (bulkWriteResponse.getWriteResponses().size() != data.size()) {
+                logger.error("wrong number of results, response = {} records = {}",
+                    bulkWriteResponse.getWriteResponses().size(), data.size());
+            }
+        }
+
+        @Override
+        void fail(SocketAddress address, Throwable t) {
+
+            // StreamOp.fail is called to fail the overall request. In case of BulkWriteOp we take the request level
+            // exception to apply to the first write. In fact for request level exceptions no request has ever been
+            // attempted, but logically we associate the error with the first write.
+            super.fail(address, t);
+            Iterator<Promise<DLSN>> resultIterator = results.iterator();
+
+            // Fail the first write with the batch level failure.
+            if (resultIterator.hasNext()) {
+                Promise<DLSN> result = resultIterator.next();
+                result.setException(t);
+            }
+
+            // Fail the remaining writes as cancelled requests.
+            while (resultIterator.hasNext()) {
+                Promise<DLSN> result = resultIterator.next();
+                result.setException(new CancelledRequestException());
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        List<Future<DLSN>> result() {
+            return (List) results;
+        }
+    }
+
+    abstract class AbstractWriteOp extends StreamOp {
+
+        final Promise<WriteResponse> result = new Promise<WriteResponse>();
+        Long crc32 = null;
+
+        AbstractWriteOp(final String name, final OpStats opStats) {
+            super(name, opStats);
+        }
+
+        void complete(SocketAddress address, WriteResponse response) {
+            super.complete(address);
+            result.setValue(response);
+        }
+
+        @Override
+        void fail(SocketAddress address, Throwable t) {
+            super.fail(address, t);
+            result.setException(t);
+        }
+
+        @Override
+        Long computeChecksum() {
+            if (null == crc32) {
+                crc32 = ProtocolUtils.streamOpCRC32(stream);
+            }
+            return crc32;
+        }
+
+        @Override
+        Future<ResponseHeader> sendRequest(final ProxyClient sc) {
+            return this.sendWriteRequest(sc).addEventListener(new FutureEventListener<WriteResponse>() {
+                @Override
+                public void onSuccess(WriteResponse response) {
+                    if (response.getHeader().getCode() == StatusCode.SUCCESS) {
+                        beforeComplete(sc, response.getHeader());
+                        AbstractWriteOp.this.complete(sc.getAddress(), response);
+                    }
+                }
+                @Override
+                public void onFailure(Throwable cause) {
+                    // handled by the ResponseHeader listener
+                }
+            }).map(new AbstractFunction1<WriteResponse, ResponseHeader>() {
+                @Override
+                public ResponseHeader apply(WriteResponse response) {
+                    return response.getHeader();
+                }
+            });
+        }
+
+        abstract Future<WriteResponse> sendWriteRequest(ProxyClient sc);
+    }
+
+    class WriteOp extends AbstractWriteOp {
+        final ByteBuffer data;
+
+        WriteOp(final String name, final ByteBuffer data) {
+            super(name, clientStats.getOpStats("write"));
+            this.data = data;
+        }
+
+        @Override
+        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
+            return sc.getService().writeWithContext(stream, data, ctx);
+        }
+
+        @Override
+        Long computeChecksum() {
+            if (null == crc32) {
+                byte[] dataBytes = new byte[data.remaining()];
+                data.duplicate().get(dataBytes);
+                crc32 = ProtocolUtils.writeOpCRC32(stream, dataBytes);
+            }
+            return crc32;
+        }
+
+        Future<DLSN> result() {
+            return result.map(new AbstractFunction1<WriteResponse, DLSN>() {
+                @Override
+                public DLSN apply(WriteResponse response) {
+                    return DLSN.deserialize(response.getDlsn());
+                }
+            });
+        }
+    }
+
+    class TruncateOp extends AbstractWriteOp {
+        final DLSN dlsn;
+
+        TruncateOp(String name, DLSN dlsn) {
+            super(name, clientStats.getOpStats("truncate"));
+            this.dlsn = dlsn;
+        }
+
+        @Override
+        Long computeChecksum() {
+            if (null == crc32) {
+                crc32 = ProtocolUtils.truncateOpCRC32(stream, dlsn);
+            }
+            return crc32;
+        }
+
+        @Override
+        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
+            return sc.getService().truncate(stream, dlsn.serialize(), ctx);
+        }
+
+        Future<Boolean> result() {
+            return result.map(new AbstractFunction1<WriteResponse, Boolean>() {
+                @Override
+                public Boolean apply(WriteResponse response) {
+                    return true;
+                }
+            });
+        }
+    }
+
+    class WriteRecordSetOp extends WriteOp {
+
+        WriteRecordSetOp(String name, LogRecordSetBuffer recordSet) {
+            super(name, recordSet.getBuffer());
+            ctx.setIsRecordSet(true);
+        }
+
+    }
+
+
+    class ReleaseOp extends AbstractWriteOp {
+
+        ReleaseOp(String name) {
+            super(name, clientStats.getOpStats("release"));
+        }
+
+        @Override
+        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
+            return sc.getService().release(stream, ctx);
+        }
+
+        @Override
+        void beforeComplete(ProxyClient sc, ResponseHeader header) {
+            ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted");
+        }
+
+        Future<Void> result() {
+            return result.map(new AbstractFunction1<WriteResponse, Void>() {
+                @Override
+                public Void apply(WriteResponse response) {
+                    return null;
+                }
+            });
+        }
+    }
+
+    class DeleteOp extends AbstractWriteOp {
+
+        DeleteOp(String name) {
+            super(name, clientStats.getOpStats("delete"));
+        }
+
+        @Override
+        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
+            return sc.getService().delete(stream, ctx);
+        }
+
+        @Override
+        void beforeComplete(ProxyClient sc, ResponseHeader header) {
+            ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted");
+        }
+
+        Future<Void> result() {
+            return result.map(new AbstractFunction1<WriteResponse, Void>() {
+                @Override
+                public Void apply(WriteResponse v1) {
+                    return null;
+                }
+            });
+        }
+    }
+
+    class CreateOp extends AbstractWriteOp {
+
+        CreateOp(String name) {
+            super(name, clientStats.getOpStats("create"));
+        }
+
+        @Override
+        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
+            return sc.getService().create(stream, ctx);
+        }
+
+        @Override
+        void beforeComplete(ProxyClient sc, ResponseHeader header) {
+            ownershipCache.updateOwner(stream, sc.getAddress());
+        }
+
+        Future<Void> result() {
+            return result.map(new AbstractFunction1<WriteResponse, Void>() {
+                @Override
+                public Void apply(WriteResponse v1) {
+                    return null;
+                }
+            }).voided();
+        }
+    }
+
+    class HeartbeatOp extends AbstractWriteOp {
+        HeartbeatOptions options;
+
+        HeartbeatOp(String name, boolean sendReaderHeartBeat) {
+            super(name, clientStats.getOpStats("heartbeat"));
+            options = new HeartbeatOptions();
+            options.setSendHeartBeatToReader(sendReaderHeartBeat);
+        }
+
+        @Override
+        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
+            return sc.getService().heartbeatWithOptions(stream, ctx, options);
+        }
+
+        Future<Void> result() {
+            return result.map(new AbstractFunction1<WriteResponse, Void>() {
+                @Override
+                public Void apply(WriteResponse response) {
+                    return null;
+                }
+            });
+        }
+    }
+
+    // Stats
+    private final ClientStats clientStats;
+
+    public DistributedLogClientImpl(String name,
+                                    ClientId clientId,
+                                    RoutingService routingService,
+                                    ClientBuilder clientBuilder,
+                                    ClientConfig clientConfig,
+                                    Optional<ClusterClient> clusterClient,
+                                    StatsReceiver statsReceiver,
+                                    StatsReceiver streamStatsReceiver,
+                                    RegionResolver regionResolver,
+                                    boolean enableRegionStats) {
+        this.clientName = name;
+        this.clientId = clientId;
+        this.routingService = routingService;
+        this.clientConfig = clientConfig;
+        this.streamFailfast = clientConfig.getStreamFailfast();
+        this.streamNameRegexPattern = Pattern.compile(clientConfig.getStreamNameRegex());
+        this.regionResolver = regionResolver;
+        // Build the timer
+        this.dlTimer = new HashedWheelTimer(
+                new ThreadFactoryBuilder().setNameFormat("DLClient-" + name + "-timer-%d").build(),
+                this.clientConfig.getRedirectBackoffStartMs(),
+                TimeUnit.MILLISECONDS);
+        // register routing listener
+        this.routingService.registerListener(this);
+        // build the ownership cache
+        this.ownershipCache = new OwnershipCache(this.clientConfig, this.dlTimer, statsReceiver, streamStatsReceiver);
+        // Client Stats
+        this.clientStats = new ClientStats(statsReceiver, enableRegionStats, regionResolver);
+        // Client Manager
+        this.clientBuilder = ProxyClient.newBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats);
+        this.clientManager = new ProxyClientManager(
+                this.clientConfig,  // client config
+                this.clientBuilder, // client builder
+                this.dlTimer,       // timer
+                this,               // host provider
+                clientStats);       // client stats
+        this.clusterClient = clusterClient;
+        this.clientManager.registerProxyListener(this);
+
+        // Cache Stats
+        StatsReceiver cacheStatReceiver = statsReceiver.scope("cache");
+        Seq<String> numCachedStreamsGaugeName =
+                scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_streams")).toList();
+        cacheStatReceiver.provideGauge(numCachedStreamsGaugeName, new Function0<Object>() {
+            @Override
+            public Object apply() {
+                return (float) ownershipCache.getNumCachedStreams();
+            }
+        });
+        Seq<String> numCachedHostsGaugeName =
+                scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_hosts")).toList();
+        cacheStatReceiver.provideGauge(numCachedHostsGaugeName, new Function0<Object>() {
+            @Override
+            public Object apply() {
+                return (float) clientManager.getNumProxies();
+            }
+        });
+
+        logger.info("Build distributedlog client : name = {}, client_id = {}, routing_service = {},"
+            + " stats_receiver = {}, thriftmux = {}",
+            new Object[] {
+                name,
+                clientId,
+                routingService.getClass(),
+                statsReceiver.getClass(),
+                clientConfig.getThriftMux()
+            });
+    }
+
+    @Override
+    public Set<SocketAddress> getHosts() {
+        Set<SocketAddress> hosts = Sets.newHashSet();
+        // if using server side routing, we only handshake with the hosts in ownership cache.
+        if (!clusterClient.isPresent()) {
+            hosts.addAll(this.routingService.getHosts());
+        }
+        hosts.addAll(this.ownershipCache.getStreamOwnershipDistribution().keySet());
+        return hosts;
+    }
+
+    @Override
+    public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
+        if (null != serverInfo
+            && serverInfo.isSetServerStatus()
+            && ServerStatus.DOWN == serverInfo.getServerStatus()) {
+            logger.info("{} is detected as DOWN during handshaking", address);
+            // server is shutting down
+            handleServiceUnavailable(address, client, Optional.<StreamOp>absent());
+            return;
+        }
+
+        if (null != serverInfo && serverInfo.isSetOwnerships()) {
+            Map<String, String> ownerships = serverInfo.getOwnerships();
+            logger.debug("Handshaked with {} : {} ownerships returned.", address, ownerships.size());
+            for (Map.Entry<String, String> entry : ownerships.entrySet()) {
+                Matcher matcher = streamNameRegexPattern.matcher(entry.getKey());
+                if (!matcher.matches()) {
+                    continue;
+                }
+                updateOwnership(entry.getKey(), entry.getValue());
+            }
+        } else {
+            logger.debug("Handshaked with {} : no ownerships returned", address);
+        }
+    }
+
+    @Override
+    public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
+        cause = showRootCause(Optional.<StreamOp>absent(), cause);
+        handleRequestException(address, client, Optional.<StreamOp>absent(), cause);
+    }
+
+    @VisibleForTesting
+    public void handshake() {
+        clientManager.handshake();
+        logger.info("Handshaked with {} hosts, cached {} streams",
+                clientManager.getNumProxies(), ownershipCache.getNumCachedStreams());
+    }
+
+    @Override
+    public void onServerLeft(SocketAddress address) {
+        onServerLeft(address, null);
+    }
+
+    private void onServerLeft(SocketAddress address, ProxyClient sc) {
+        ownershipCache.removeAllStreamsFromOwner(address);
+        if (null == sc) {
+            clientManager.removeClient(address);
+        } else {
+            clientManager.removeClient(address, sc);
+        }
+    }
+
+    @Override
+    public void onServerJoin(SocketAddress address) {
+        // we only pre-create connection for client-side routing
+        // if it is server side routing, we only know the exact proxy address
+        // when #getOwner.
+        if (!clusterClient.isPresent()) {
+            clientManager.createClient(address);
+        }
+    }
+
+    public void close() {
+        closeLock.writeLock().lock();
+        try {
+            if (closed) {
+                return;
+            }
+            closed = true;
+        } finally {
+            closeLock.writeLock().unlock();
+        }
+        clientManager.close();
+        routingService.unregisterListener(this);
+        routingService.stopService();
+        dlTimer.stop();
+    }
+
+    @Override
+    public Future<Void> check(String stream) {
+        final HeartbeatOp op = new HeartbeatOp(stream, false);
+        sendRequest(op);
+        return op.result();
+    }
+
+    @Override
+    public Future<Void> heartbeat(String stream) {
+        final HeartbeatOp op = new HeartbeatOp(stream, true);
+        sendRequest(op);
+        return op.result();
+    }
+
+    @Override
+    public Map<SocketAddress, Set<String>> getStreamOwnershipDistribution() {
+        return ownershipCache.getStreamOwnershipDistribution();
+    }
+
+    @Override
+    public Future<Void> setAcceptNewStream(boolean enabled) {
+        Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients();
+        List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size());
+        for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) {
+            futures.add(entry.getValue().getService().setAcceptNewStream(enabled));
+        }
+        return Future.collect(futures).map(new Function<List<Void>, Void>() {
+            @Override
+            public Void apply(List<Void> list) {
+                return null;
+            }
+        });
+    }
+
+    @Override
+    public Future<DLSN> write(String stream, ByteBuffer data) {
+        final WriteOp op = new WriteOp(stream, data);
+        sendRequest(op);
+        return op.result();
+    }
+
+    @Override
+    public Future<DLSN> writeRecordSet(String stream, final LogRecordSetBuffer recordSet) {
+        final WriteRecordSetOp op = new WriteRecordSetOp(stream, recordSet);
+        sendRequest(op);
+        return op.result();
+    }
+
+    @Override
+    public List<Future<DLSN>> writeBulk(String stream, List<ByteBuffer> data) {
+        if (data.size() > 0) {
+            final BulkWriteOp op = new BulkWriteOp(stream, data);
+            sendRequest(op);
+            return op.result();
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    @Override
+    public Future<Boolean> truncate(String stream, DLSN dlsn) {
+        final TruncateOp op = new TruncateOp(stream, dlsn);
+        sendRequest(op);
+        return op.result();
+    }
+
+    @Override
+    public Future<Void> delete(String stream) {
+        final DeleteOp op = new DeleteOp(stream);
+        sendRequest(op);
+        return op.result();
+    }
+
+    @Override
+    public Future<Void> release(String stream) {
+        final ReleaseOp op = new ReleaseOp(stream);
+        sendRequest(op);
+        return op.result();
+    }
+
+    @Override
+    public Future<Void> create(String stream) {
+        final CreateOp op = new CreateOp(stream);
+        sendRequest(op);
+        return op.result();
+    }
+
+    private void sendRequest(final StreamOp op) {
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                op.fail(null, new DLClientClosedException("Client " + clientName + " is closed."));
+            } else {
+                doSend(op, null);
+            }
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Send the stream operation by routing service, excluding previous address if it is not null.
+     *
+     * @param op
+     *          stream operation.
+     * @param previousAddr
+     *          previous tried address.
+     */
+    private void doSend(final StreamOp op, final SocketAddress previousAddr) {
+        if (null != previousAddr) {
+            op.routingContext.addTriedHost(previousAddr, StatusCode.WRITE_EXCEPTION);
+        }
+        // Get host first
+        final SocketAddress address = ownershipCache.getOwner(op.stream);
+        if (null == address || op.routingContext.isTriedHost(address)) {
+            getOwner(op).addEventListener(new FutureEventListener<SocketAddress>() {
+                @Override
+                public void onFailure(Throwable cause) {
+                    op.fail(null, cause);
+                }
+
+                @Override
+                public void onSuccess(SocketAddress ownerAddr) {
+                    op.send(ownerAddr);
+                }
+            });
+        } else {
+            op.send(address);
+        }
+    }
+
+    private void retryGetOwnerFromResourcePlacementServer(final StreamOp op,
+                                                final Promise<SocketAddress> getOwnerPromise,
+                                                final Throwable cause) {
+        if (op.shouldTimeout()) {
+            op.fail(null, cause);
+            return;
+        }
+        getOwnerFromResourcePlacementServer(op, getOwnerPromise);
+    }
+
+    private void getOwnerFromResourcePlacementServer(final StreamOp op,
+                                                     final Promise<SocketAddress> getOwnerPromise) {
+        clusterClient.get().getService().getOwner(op.stream, op.ctx)
+            .addEventListener(new FutureEventListener<WriteResponse>() {
+                @Override
+                public void onFailure(Throwable cause) {
+                    getOwnerPromise.updateIfEmpty(new Throw<SocketAddress>(cause));
+                }
+
+                @Override
+                public void onSuccess(WriteResponse value) {
+                    if (StatusCode.FOUND == value.getHeader().getCode()
+                          && null != value.getHeader().getLocation()) {
+                        try {
+                            InetSocketAddress addr = DLSocketAddress.deserialize(
+                                value.getHeader().getLocation()
+                            ).getSocketAddress();
+                            getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr));
+                        } catch (IOException e) {
+                            // retry from the routing server again
+                            logger.error("ERROR in getOwner", e);
+                            retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, e);
+                            return;
+                        }
+                    } else {
+                        // retry from the routing server again
+                        retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise,
+                                new StreamUnavailableException("Stream " + op.stream + "'s owner is unknown"));
+                    }
+                }
+            });
+    }
+
+    private Future<SocketAddress> getOwner(final StreamOp op) {
+        if (clusterClient.isPresent()) {
+            final Promise<SocketAddress> getOwnerPromise = new Promise<SocketAddress>();
+            getOwnerFromResourcePlacementServer(op, getOwnerPromise);
+            return getOwnerPromise;
+        }
+        // pickup host by hashing
+        try {
+            return Future.value(routingService.getHost(op.stream, op.routingContext));
+        } catch (NoBrokersAvailableException nbae) {
+            return Future.exception(nbae);
+        }
+    }
+
+    private void sendWriteRequest(final SocketAddress addr, final StreamOp op) {
+        // Get corresponding finagle client
+        final ProxyClient sc = clientManager.getClient(addr);
+        final long startTimeNanos = System.nanoTime();
+        // write the request to that host.
+        op.sendRequest(sc).addEventListener(new FutureEventListener<ResponseHeader>() {
+            @Override
+            public void onSuccess(ResponseHeader header) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Received response; header: {}", header);
+                }
+                clientStats.completeProxyRequest(addr, header.getCode(), startTimeNanos);
+                // update routing context
+                op.routingContext.addTriedHost(addr, header.getCode());
+                switch (header.getCode()) {
+                    case SUCCESS:
+                        // success handling is done per stream op
+                        break;
+                    case FOUND:
+                        handleRedirectResponse(header, op, addr);
+                        break;
+                    // for overcapacity, dont report failure since this normally happens quite a bit
+                    case OVER_CAPACITY:
+                        logger.debug("Failed to write request to {} : {}", op.stream, header);
+                        op.fail(addr, DLException.of(header));
+                        break;
+                    // for responses that indicate the requests definitely failed,
+                    // we should fail them immediately (e.g. TOO_LARGE_RECORD, METADATA_EXCEPTION)
+                    case NOT_IMPLEMENTED:
+                    case METADATA_EXCEPTION:
+                    case LOG_EMPTY:
+                    case LOG_NOT_FOUND:
+                    case TRUNCATED_TRANSACTION:
+                    case END_OF_STREAM:
+                    case TRANSACTION_OUT_OF_ORDER:
+                    case INVALID_STREAM_NAME:
+                    case REQUEST_DENIED:
+                    case TOO_LARGE_RECORD:
+                    case CHECKSUM_FAILED:
+                    // status code NOT_READY is returned if failfast is enabled in the server. don't redirect
+                    // since the proxy may still own the stream.
+                    case STREAM_NOT_READY:
+                        op.fail(addr, DLException.of(header));
+                        break;
+                    case SERVICE_UNAVAILABLE:
+                        handleServiceUnavailable(addr, sc, Optional.of(op));
+                        break;
+                    case REGION_UNAVAILABLE:
+                        // region is unavailable, redirect the request to hosts in other region
+                        redirect(op, null);
+                        break;
+                    // Proxy was overloaded and refused to try to acquire the stream. Don't remove ownership, since
+                    // we didn't have it in the first place.
+                    case TOO_MANY_STREAMS:
+                        handleRedirectableError(addr, op, header);
+                        break;
+                    case STREAM_UNAVAILABLE:
+                    case ZOOKEEPER_ERROR:
+                    case LOCKING_EXCEPTION:
+                    case UNEXPECTED:
+                    case INTERRUPTED:
+                    case BK_TRANSMIT_ERROR:
+                    case FLUSH_TIMEOUT:
+                    default:
+                        // when we are receiving these exceptions from proxy, it means proxy or the stream is closed
+                        // redirect the request.
+                        ownershipCache.removeOwnerFromStream(op.stream, addr, header.getCode().name());
+                        handleRedirectableError(addr, op, header);
+                        break;
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                Optional<StreamOp> opOptional = Optional.of(op);
+                cause = showRootCause(opOptional, cause);
+                clientStats.failProxyRequest(addr, cause, startTimeNanos);
+                handleRequestException(addr, sc, opOptional, cause);
+            }
+        });
+    }
+
+    // Response Handlers
+
+    Throwable showRootCause(Optional<StreamOp> op, Throwable cause) {
+        if (cause instanceof Failure) {
+            Failure failure = (Failure) cause;
+            if (failure.isFlagged(Failure.Wrapped())) {
+                try {
+                    // if it is a wrapped failure, unwrap it first
+                    cause = failure.show();
+                } catch (IllegalArgumentException iae) {
+                    if (op.isPresent()) {
+                        logger.warn("Failed to unwrap finagle failure of stream {} : ", op.get().stream, iae);
+                    } else {
+                        logger.warn("Failed to unwrap finagle failure : ", iae);
+                    }
+                }
+            }
+        }
+        return cause;
+    }
+
+    private void handleRedirectableError(SocketAddress addr,
+                                         StreamOp op,
+                                         ResponseHeader header) {
+        if (streamFailfast) {
+            op.fail(addr, DLException.of(header));
+        } else {
+            redirect(op, null);
+        }
+    }
+
+    void handleServiceUnavailable(SocketAddress addr,
+                                  ProxyClient sc,
+                                  Optional<StreamOp> op) {
+        // service is unavailable, remove it out of routing service
+        routingService.removeHost(addr, new ServiceUnavailableException(addr + " is unavailable now."));
+        onServerLeft(addr);
+        if (op.isPresent()) {
+            ownershipCache.removeOwnerFromStream(op.get().stream, addr, addr + " is unavailable now.");
+            // redirect the request to other host.
+            redirect(op.get(), null);
+        }
+    }
+
+    void handleRequestException(SocketAddress addr,
+                                ProxyClient sc,
+                                Optional<StreamOp> op,
+                                Throwable cause) {
+        boolean resendOp = false;
+        boolean removeOwnerFromStream = false;
+        SocketAddress previousAddr = addr;
+        String reason = cause.getMessage();
+        if (cause instanceof ConnectionFailedException || cause instanceof java.net.ConnectException) {
+            routingService.removeHost(addr, cause);
+            onServerLeft(addr, sc);
+            removeOwnerFromStream = true;
+            // redirect the request to other host.
+            resendOp = true;
+        } else if (cause instanceof ChannelException) {
+            // java.net.ConnectException typically means connection is refused remotely
+            // no process listening on remote address/port.
+            if (cause.getCause() instanceof java.net.ConnectException) {
+                routingService.removeHost(addr, cause.getCause());
+                onServerLeft(addr);
+                reason = cause.getCause().getMessage();
+            } else {
+                routingService.removeHost(addr, cause);
+                reason = cause.getMessage();
+            }
+            removeOwnerFromStream = true;
+            // redirect the request to other host.
+            resendOp = true;
+        } else if (cause instanceof ServiceTimeoutException) {
+            // redirect the request to itself again, which will backoff for a while
+            resendOp = true;
+            previousAddr = null;
+        } else if (cause instanceof WriteException) {
+            // redirect the request to other host.
+            resendOp = true;
+        } else if (cause instanceof ServiceException) {
+            // redirect the request to other host.
+            clientManager.removeClient(addr, sc);
+            resendOp = true;
+        } else if (cause instanceof TApplicationException) {
+            handleTApplicationException(cause, op, addr, sc);
+        } else if (cause instanceof Failure) {
+            handleFinagleFailure((Failure) cause, op, addr);
+        } else {
+            // Default handler
+            handleException(cause, op, addr);
+        }
+
+        if (op.isPresent()) {
+            if (removeOwnerFromStream) {
+                ownershipCache.removeOwnerFromStream(op.get().stream, addr, reason);
+            }
+            if (resendOp) {
+                doSend(op.get(), previousAddr);
+            }
+        }
+    }
+
+    /**
+     * Redirect the request to new proxy <i>newAddr</i>. If <i>newAddr</i> is null,
+     * it would pick up a host from routing service.
+     *
+     * @param op
+     *          stream operation
+     * @param newAddr
+     *          new proxy address
+     */
+    void redirect(StreamOp op, SocketAddress newAddr) {
+        ownershipCache.getOwnershipStatsLogger().onRedirect(op.stream);
+        if (null != newAddr) {
+            logger.debug("Redirect request {} to new owner {}.", op, newAddr);
+            op.send(newAddr);
+        } else {
+            doSend(op, null);
+        }
+    }
+
+    void handleFinagleFailure(Failure failure,
+                              Optional<StreamOp> op,
+                              SocketAddress addr) {
+        if (failure.isFlagged(Failure.Restartable())) {
+            if (op.isPresent()) {
+                // redirect the request to other host
+                doSend(op.get(), addr);
+            }
+        } else {
+            // fail the request if it is other types of failures
+            handleException(failure, op, addr);
+        }
+    }
+
+    void handleException(Throwable cause,
+                         Optional<StreamOp> op,
+                         SocketAddress addr) {
+        // RequestTimeoutException: fail it and let client decide whether to retry or not.
+
+        // FailedFastException:
+        // We don't actually know when FailedFastException will be thrown
+        // so properly we just throw it back to application to let application
+        // handle it.
+
+        // Other Exceptions: as we don't know how to handle them properly so throw them to client
+        if (op.isPresent()) {
+            logger.error("Failed to write request to {} @ {} : {}",
+                    new Object[]{op.get().stream, addr, cause.toString()});
+            op.get().fail(addr, cause);
+        }
+    }
+
+    void handleTApplicationException(Throwable cause,
+                                     Optional<StreamOp> op,
+                                     SocketAddress addr,
+                                     ProxyClient sc) {
+        TApplicationException ex = (TApplicationException) cause;
+        if (ex.getType() == TApplicationException.UNKNOWN_METHOD) {
+            // if we encountered unknown method exception on thrift server, it means this proxy
+            // has problem. we should remove it from routing service, clean up ownerships
+            routingService.removeHost(addr, cause);
+            onServerLeft(addr, sc);
+            if (op.isPresent()) {
+                ownershipCache.removeOwnerFromStream(op.get().stream, addr, cause.getMessage());
+                doSend(op.get(), addr);
+            }
+        } else {
+            handleException(cause, op, addr);
+        }
+    }
+
+    void handleRedirectResponse(ResponseHeader header, StreamOp op, SocketAddress curAddr) {
+        SocketAddress ownerAddr = null;
+        if (header.isSetLocation()) {
+            String owner = header.getLocation();
+            try {
+                ownerAddr = DLSocketAddress.deserialize(owner).getSocketAddress();
+                // if we are receiving a direct request to same host, we won't try the same host.
+                // as the proxy will shut itself down if it redirects client to itself.
+                if (curAddr.equals(ownerAddr)) {
+                    logger.warn("Request to stream {} is redirected to same server {}!", op.stream, curAddr);
+                    ownerAddr = null;
+                } else {
+                    // update ownership when redirects.
+                    ownershipCache.updateOwner(op.stream, ownerAddr);
+                }
+            } catch (IOException e) {
+                ownerAddr = null;
+            }
+        }
+        redirect(op, ownerAddr);
+    }
+
+    void updateOwnership(String stream, String location) {
+        try {
+            SocketAddress ownerAddr = DLSocketAddress.deserialize(location).getSocketAddress();
+            // update ownership
+            ownershipCache.updateOwner(stream, ownerAddr);
+        } catch (IOException e) {
+            logger.warn("Invalid ownership {} found for stream {} : ",
+                new Object[] { location, stream, e });
+        }
+    }
+
+}


Mime
View raw message