distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [28/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException
Date Mon, 12 Jun 2017 15:45:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java
deleted file mode 100644
index 6ef1d8e..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.proxy;
-
-import org.apache.distributedlog.client.ClientConfig;
-import org.apache.distributedlog.client.stats.ClientStats;
-import org.apache.distributedlog.thrift.service.DistributedLogService;
-import com.twitter.finagle.Service;
-import com.twitter.finagle.ThriftMux;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.thrift.ClientId;
-import com.twitter.finagle.thrift.ThriftClientFramedCodec;
-import com.twitter.finagle.thrift.ThriftClientRequest;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import scala.Option;
-import scala.runtime.BoxedUnit;
-
-/**
- * Client talks to a single proxy.
- */
-public class ProxyClient {
-
-  /**
-   * Builder to build a proxy client talking to given host <code>address</code>.
-   */
-  public interface Builder {
-        /**
-         * Build a proxy client to <code>address</code>.
-         *
-         * @param address
-         *          proxy address
-         * @return proxy client
-         */
-        ProxyClient build(SocketAddress address);
-    }
-
-    public static Builder newBuilder(String clientName,
-                                     ClientId clientId,
-                                     ClientBuilder clientBuilder,
-                                     ClientConfig clientConfig,
-                                     ClientStats clientStats) {
-        return new DefaultBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats);
-    }
-
-    /**
-     * Default Builder for {@link ProxyClient}.
-     */
-    public static class DefaultBuilder implements Builder {
-
-        private final String clientName;
-        private final ClientId clientId;
-        private final ClientBuilder clientBuilder;
-        private final ClientStats clientStats;
-
-        private DefaultBuilder(String clientName,
-                               ClientId clientId,
-                               ClientBuilder clientBuilder,
-                               ClientConfig clientConfig,
-                               ClientStats clientStats) {
-            this.clientName = clientName;
-            this.clientId = clientId;
-            this.clientStats = clientStats;
-            // client builder
-            ClientBuilder builder = setDefaultSettings(
-                    null == clientBuilder ? getDefaultClientBuilder(clientConfig) : clientBuilder);
-            this.clientBuilder = configureThriftMux(builder, clientId, clientConfig);
-        }
-
-        @SuppressWarnings("unchecked")
-        private ClientBuilder configureThriftMux(ClientBuilder builder,
-                                                 ClientId clientId,
-                                                 ClientConfig clientConfig) {
-            if (clientConfig.getThriftMux()) {
-                return builder.stack(ThriftMux.client().withClientId(clientId));
-            } else {
-                return builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId)));
-            }
-        }
-
-        private ClientBuilder getDefaultClientBuilder(ClientConfig clientConfig) {
-            ClientBuilder builder = ClientBuilder.get()
-                .tcpConnectTimeout(Duration.fromMilliseconds(200))
-                .connectTimeout(Duration.fromMilliseconds(200))
-                .requestTimeout(Duration.fromSeconds(1));
-            if (!clientConfig.getThriftMux()) {
-                builder = builder.hostConnectionLimit(1);
-            }
-            return builder;
-        }
-
-        @SuppressWarnings("unchecked")
-        private ClientBuilder setDefaultSettings(ClientBuilder builder) {
-            return builder.name(clientName)
-                   .failFast(false)
-                   .noFailureAccrual()
-                   // disable retries on finagle client builder, as there is only one host per finagle client
-                   // we should throw exception immediately on first failure, so DL client could quickly detect
-                   // failures and retry other proxies.
-                   .retries(1)
-                   .keepAlive(true);
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public ProxyClient build(SocketAddress address) {
-            Service<ThriftClientRequest, byte[]> client =
-                ClientBuilder.safeBuildFactory(
-                        clientBuilder
-                                .hosts((InetSocketAddress) address)
-                                .reportTo(clientStats.getFinagleStatsReceiver(address))
-                ).toService();
-            DistributedLogService.ServiceIface service =
-                    new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory());
-            return new ProxyClient(address, client, service);
-        }
-
-    }
-
-    private final SocketAddress address;
-    private final Service<ThriftClientRequest, byte[]> client;
-    private final DistributedLogService.ServiceIface service;
-
-    protected ProxyClient(SocketAddress address,
-                          Service<ThriftClientRequest, byte[]> client,
-                          DistributedLogService.ServiceIface service) {
-        this.address = address;
-        this.client  = client;
-        this.service = service;
-    }
-
-    public SocketAddress getAddress() {
-        return address;
-    }
-
-    public Service<ThriftClientRequest, byte[]> getClient() {
-        return client;
-    }
-
-    public DistributedLogService.ServiceIface getService() {
-        return service;
-    }
-
-    public Future<BoxedUnit> close() {
-        return client.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java
deleted file mode 100644
index 17b70be..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.proxy;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableMap;
-import org.apache.distributedlog.client.ClientConfig;
-import org.apache.distributedlog.client.stats.ClientStats;
-import org.apache.distributedlog.client.stats.OpStats;
-import org.apache.distributedlog.thrift.service.ClientInfo;
-import org.apache.distributedlog.thrift.service.ServerInfo;
-import com.twitter.util.FutureEventListener;
-import java.net.SocketAddress;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manager manages clients (channels) to proxies.
- */
-public class ProxyClientManager implements TimerTask {
-
-    private static final Logger logger = LoggerFactory.getLogger(ProxyClientManager.class);
-
-    private final ClientConfig clientConfig;
-    private final ProxyClient.Builder clientBuilder;
-    private final HashedWheelTimer timer;
-    private final HostProvider hostProvider;
-    private volatile Timeout periodicHandshakeTask;
-    private final ConcurrentHashMap<SocketAddress, ProxyClient> address2Services =
-            new ConcurrentHashMap<SocketAddress, ProxyClient>();
-    private final CopyOnWriteArraySet<ProxyListener> proxyListeners =
-            new CopyOnWriteArraySet<ProxyListener>();
-    private volatile boolean closed = false;
-    private volatile boolean periodicHandshakeEnabled = true;
-    private final Stopwatch lastOwnershipSyncStopwatch;
-
-    private final OpStats handshakeStats;
-
-    public ProxyClientManager(ClientConfig clientConfig,
-                              ProxyClient.Builder clientBuilder,
-                              HashedWheelTimer timer,
-                              HostProvider hostProvider,
-                              ClientStats clientStats) {
-        this.clientConfig = clientConfig;
-        this.clientBuilder = clientBuilder;
-        this.timer = timer;
-        this.hostProvider = hostProvider;
-        this.handshakeStats = clientStats.getOpStats("handshake");
-        scheduleHandshake();
-        this.lastOwnershipSyncStopwatch = Stopwatch.createStarted();
-    }
-
-    private void scheduleHandshake() {
-        if (clientConfig.getPeriodicHandshakeIntervalMs() > 0) {
-            periodicHandshakeTask = timer.newTimeout(this,
-                    clientConfig.getPeriodicHandshakeIntervalMs(), TimeUnit.MILLISECONDS);
-        }
-    }
-
-    void setPeriodicHandshakeEnabled(boolean enabled) {
-        this.periodicHandshakeEnabled = enabled;
-    }
-
-    @Override
-    public void run(Timeout timeout) throws Exception {
-        if (timeout.isCancelled() || closed) {
-            return;
-        }
-        if (periodicHandshakeEnabled) {
-            final boolean syncOwnerships = lastOwnershipSyncStopwatch.elapsed(TimeUnit.MILLISECONDS)
-                >= clientConfig.getPeriodicOwnershipSyncIntervalMs();
-
-            final Set<SocketAddress> hostsSnapshot = hostProvider.getHosts();
-            final AtomicInteger numHosts = new AtomicInteger(hostsSnapshot.size());
-            final AtomicInteger numStreams = new AtomicInteger(0);
-            final AtomicInteger numSuccesses = new AtomicInteger(0);
-            final AtomicInteger numFailures = new AtomicInteger(0);
-            final ConcurrentMap<SocketAddress, Integer> streamDistributions =
-                    new ConcurrentHashMap<SocketAddress, Integer>();
-            final Stopwatch stopwatch = Stopwatch.createStarted();
-            for (SocketAddress host : hostsSnapshot) {
-                final SocketAddress address = host;
-                final ProxyClient client = getClient(address);
-                handshake(address, client, new FutureEventListener<ServerInfo>() {
-                    @Override
-                    public void onSuccess(ServerInfo serverInfo) {
-                        numStreams.addAndGet(serverInfo.getOwnershipsSize());
-                        numSuccesses.incrementAndGet();
-                        notifyHandshakeSuccess(address, client, serverInfo, false, stopwatch);
-                        if (clientConfig.isHandshakeTracingEnabled()) {
-                            streamDistributions.putIfAbsent(address, serverInfo.getOwnershipsSize());
-                        }
-                        complete();
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        numFailures.incrementAndGet();
-                        notifyHandshakeFailure(address, client, cause, stopwatch);
-                        complete();
-                    }
-
-                    private void complete() {
-                        if (0 == numHosts.decrementAndGet()) {
-                            if (syncOwnerships) {
-                                logger.info("Periodic handshaked with {} hosts : {} streams returned,"
-                                    + " {} hosts succeeded, {} hosts failed",
-                                    new Object[] {
-                                        hostsSnapshot.size(),
-                                        numStreams.get(),
-                                        numSuccesses.get(),
-                                        numFailures.get()});
-                                if (clientConfig.isHandshakeTracingEnabled()) {
-                                    logger.info("Periodic handshaked stream distribution : {}", streamDistributions);
-                                }
-                            }
-                        }
-                    }
-                }, false, syncOwnerships);
-            }
-
-            if (syncOwnerships) {
-                lastOwnershipSyncStopwatch.reset().start();
-            }
-        }
-        scheduleHandshake();
-    }
-
-    /**
-     * Register a proxy <code>listener</code> on proxy related changes.
-     *
-     * @param listener
-     *          proxy listener
-     */
-    public void registerProxyListener(ProxyListener listener) {
-        proxyListeners.add(listener);
-    }
-
-    private void notifyHandshakeSuccess(SocketAddress address,
-                                        ProxyClient client,
-                                        ServerInfo serverInfo,
-                                        boolean logging,
-                                        Stopwatch stopwatch) {
-        if (logging) {
-            if (null != serverInfo && serverInfo.isSetOwnerships()) {
-                logger.info("Handshaked with {} : {} ownerships returned.",
-                        address, serverInfo.getOwnerships().size());
-            } else {
-                logger.info("Handshaked with {} : no ownerships returned", address);
-            }
-        }
-        handshakeStats.completeRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1);
-        for (ProxyListener listener : proxyListeners) {
-            listener.onHandshakeSuccess(address, client, serverInfo);
-        }
-    }
-
-    private void notifyHandshakeFailure(SocketAddress address,
-                                        ProxyClient client,
-                                        Throwable cause,
-                                        Stopwatch stopwatch) {
-        handshakeStats.failRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1);
-        for (ProxyListener listener : proxyListeners) {
-            listener.onHandshakeFailure(address, client, cause);
-        }
-    }
-
-    /**
-     * Retrieve a client to proxy <code>address</code>.
-     *
-     * @param address
-     *          proxy address
-     * @return proxy client
-     */
-    public ProxyClient getClient(final SocketAddress address) {
-        ProxyClient sc = address2Services.get(address);
-        if (null != sc) {
-            return sc;
-        }
-        return createClient(address);
-    }
-
-    /**
-     * Remove the client to proxy <code>address</code>.
-     *
-     * @param address
-     *          proxy address
-     */
-    public void removeClient(SocketAddress address) {
-        ProxyClient sc = address2Services.remove(address);
-        if (null != sc) {
-            logger.info("Removed host {}.", address);
-            sc.close();
-        }
-    }
-
-    /**
-     * Remove the client <code>sc</code> to proxy <code>address</code>.
-     *
-     * @param address
-     *          proxy address
-     * @param sc
-     *          proxy client
-     */
-    public void removeClient(SocketAddress address, ProxyClient sc) {
-        if (address2Services.remove(address, sc)) {
-            logger.info("Remove client {} to host {}.", sc, address);
-            sc.close();
-        }
-    }
-
-    /**
-     * Create a client to proxy <code>address</code>.
-     *
-     * @param address
-     *          proxy address
-     * @return proxy client
-     */
-    public ProxyClient createClient(final SocketAddress address) {
-        final ProxyClient sc = clientBuilder.build(address);
-        ProxyClient oldSC = address2Services.putIfAbsent(address, sc);
-        if (null != oldSC) {
-            sc.close();
-            return oldSC;
-        } else {
-            final Stopwatch stopwatch = Stopwatch.createStarted();
-            FutureEventListener<ServerInfo> listener = new FutureEventListener<ServerInfo>() {
-                @Override
-                public void onSuccess(ServerInfo serverInfo) {
-                    notifyHandshakeSuccess(address, sc, serverInfo, true, stopwatch);
-                }
-                @Override
-                public void onFailure(Throwable cause) {
-                    notifyHandshakeFailure(address, sc, cause, stopwatch);
-                }
-            };
-            // send a ping messaging after creating connections.
-            handshake(address, sc, listener, true, true);
-            return sc;
-        }
-    }
-
-    /**
-     * Handshake with a given proxy.
-     *
-     * @param address
-     *          proxy address
-     * @param sc
-     *          proxy client
-     * @param listener
-     *          listener on handshake result
-     */
-    private void handshake(SocketAddress address,
-                           ProxyClient sc,
-                           FutureEventListener<ServerInfo> listener,
-                           boolean logging,
-                           boolean getOwnerships) {
-        if (clientConfig.getHandshakeWithClientInfo()) {
-            ClientInfo clientInfo = new ClientInfo();
-            clientInfo.setGetOwnerships(getOwnerships);
-            clientInfo.setStreamNameRegex(clientConfig.getStreamNameRegex());
-            if (logging) {
-                logger.info("Handshaking with {} : {}", address, clientInfo);
-            }
-            sc.getService().handshakeWithClientInfo(clientInfo)
-                    .addEventListener(listener);
-        } else {
-            if (logging) {
-                logger.info("Handshaking with {}", address);
-            }
-            sc.getService().handshake().addEventListener(listener);
-        }
-    }
-
-    /**
-     * Handshake with all proxies.
-     *
-     * <p>NOTE: this is a synchronous call.
-     */
-    public void handshake() {
-        Set<SocketAddress> hostsSnapshot = hostProvider.getHosts();
-        logger.info("Handshaking with {} hosts.", hostsSnapshot.size());
-        final CountDownLatch latch = new CountDownLatch(hostsSnapshot.size());
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        for (SocketAddress host: hostsSnapshot) {
-            final SocketAddress address = host;
-            final ProxyClient client = getClient(address);
-            handshake(address, client, new FutureEventListener<ServerInfo>() {
-                @Override
-                public void onSuccess(ServerInfo serverInfo) {
-                    notifyHandshakeSuccess(address, client, serverInfo, true, stopwatch);
-                    latch.countDown();
-                }
-                @Override
-                public void onFailure(Throwable cause) {
-                    notifyHandshakeFailure(address, client, cause, stopwatch);
-                    latch.countDown();
-                }
-            }, true, true);
-        }
-        try {
-            latch.await(1, TimeUnit.MINUTES);
-        } catch (InterruptedException e) {
-            logger.warn("Interrupted on handshaking with servers : ", e);
-        }
-    }
-
-    /**
-     * Return number of proxies managed by client manager.
-     *
-     * @return number of proxies managed by client manager.
-     */
-    public int getNumProxies() {
-        return address2Services.size();
-    }
-
-    /**
-     * Return all clients.
-     *
-     * @return all clients.
-     */
-    public Map<SocketAddress, ProxyClient> getAllClients() {
-        return ImmutableMap.copyOf(address2Services);
-    }
-
-    public void close() {
-        closed = true;
-        Timeout task = periodicHandshakeTask;
-        if (null != task) {
-            task.cancel();
-        }
-        for (ProxyClient sc : address2Services.values()) {
-            sc.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java
deleted file mode 100644
index 0a6b076..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.proxy;
-
-import org.apache.distributedlog.thrift.service.ServerInfo;
-import java.net.SocketAddress;
-
-/**
- * Listener on server changes.
- */
-public interface ProxyListener {
-    /**
-     * When a proxy's server info changed, it would be notified.
-     *
-     * @param address
-     *          proxy address
-     * @param client
-     *          proxy client that executes handshaking
-     * @param serverInfo
-     *          proxy's server info
-     */
-    void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo);
-
-    /**
-     * Failed to handshake with a proxy.
-     *
-     * @param address
-     *          proxy address
-     * @param client
-     *          proxy client
-     * @param cause
-     *          failure reason
-     */
-    void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java
deleted file mode 100644
index 4161afb..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Clients that interact with individual proxies.
- */
-package org.apache.distributedlog.client.proxy;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java
deleted file mode 100644
index 2ac5be3..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.resolver;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Default implementation of {@link RegionResolver}.
- */
-public class DefaultRegionResolver implements RegionResolver {
-
-    private static final String DEFAULT_REGION = "default-region";
-
-    private final Map<SocketAddress, String> regionOverrides =
-            new HashMap<SocketAddress, String>();
-    private final ConcurrentMap<SocketAddress, String> regionMap =
-            new ConcurrentHashMap<SocketAddress, String>();
-
-    public DefaultRegionResolver() {
-    }
-
-    public DefaultRegionResolver(Map<SocketAddress, String> regionOverrides) {
-        this.regionOverrides.putAll(regionOverrides);
-    }
-
-    @Override
-    public String resolveRegion(SocketAddress address) {
-        String region = regionMap.get(address);
-        if (null == region) {
-            region = doResolveRegion(address);
-            regionMap.put(address, region);
-        }
-        return region;
-    }
-
-    private String doResolveRegion(SocketAddress address) {
-        String region = regionOverrides.get(address);
-        if (null != region) {
-            return region;
-        }
-
-        String domainName;
-        if (address instanceof InetSocketAddress) {
-            InetSocketAddress iAddr = (InetSocketAddress) address;
-            domainName = iAddr.getHostName();
-        } else {
-            domainName = address.toString();
-        }
-        String[] parts = domainName.split("\\.");
-        if (parts.length <= 0) {
-            return DEFAULT_REGION;
-        }
-        String hostName = parts[0];
-        String[] labels = hostName.split("-");
-        if (labels.length != 4) {
-            return DEFAULT_REGION;
-        }
-        return labels[0];
-    }
-
-    @Override
-    public void removeCachedHost(SocketAddress address) {
-        regionMap.remove(address);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java
deleted file mode 100644
index 023799c..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.resolver;
-
-import java.net.SocketAddress;
-
-/**
- * Resolve address to region.
- */
-public interface RegionResolver {
-
-    /**
-     * Resolve address to region.
-     *
-     * @param address
-     *          socket address
-     * @return region
-     */
-    String resolveRegion(SocketAddress address);
-
-    /**
-     * Remove cached host.
-     *
-     * @param address
-     *          socket address.
-     */
-    void removeCachedHost(SocketAddress address);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java
deleted file mode 100644
index 81cda2f..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Resolver to resolve network addresses.
- */
-package org.apache.distributedlog.client.resolver;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java
deleted file mode 100644
index 666fa31..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java
+++ /dev/null
@@ -1,500 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.zookeeper.ServerSet;
-import org.apache.distributedlog.service.DLSocketAddress;
-import com.twitter.finagle.ChannelException;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.Counter;
-import com.twitter.finagle.stats.Gauge;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.util.Function0;
-import java.net.SocketAddress;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang3.tuple.Pair;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.Seq;
-
-/**
- * Consistent Hashing Based {@link RoutingService}.
- */
-public class ConsistentHashRoutingService extends ServerSetRoutingService {
-
-    private static final Logger logger = LoggerFactory.getLogger(ConsistentHashRoutingService.class);
-
-    @Deprecated
-    public static ConsistentHashRoutingService of(ServerSetWatcher serverSetWatcher, int numReplicas) {
-        return new ConsistentHashRoutingService(serverSetWatcher, numReplicas, 300, NullStatsReceiver.get());
-    }
-
-    /**
-     * Builder helper class to build a consistent hash bashed {@link RoutingService}.
-     *
-     * @return builder to build a consistent hash based {@link RoutingService}.
-     */
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /**
-     * Builder for building consistent hash based routing service.
-     */
-    public static class Builder implements RoutingService.Builder {
-
-        private ServerSet serverSet;
-        private boolean resolveFromName = false;
-        private int numReplicas;
-        private int blackoutSeconds = 300;
-        private StatsReceiver statsReceiver = NullStatsReceiver.get();
-
-        private Builder() {}
-
-        public Builder serverSet(ServerSet serverSet) {
-            this.serverSet = serverSet;
-            return this;
-        }
-
-        public Builder resolveFromName(boolean enabled) {
-            this.resolveFromName = enabled;
-            return this;
-        }
-
-        public Builder numReplicas(int numReplicas) {
-            this.numReplicas = numReplicas;
-            return this;
-        }
-
-        public Builder blackoutSeconds(int seconds) {
-            this.blackoutSeconds = seconds;
-            return this;
-        }
-
-        public Builder statsReceiver(StatsReceiver statsReceiver) {
-            this.statsReceiver = statsReceiver;
-            return this;
-        }
-
-        @Override
-        public RoutingService build() {
-            checkNotNull(serverSet, "No serverset provided.");
-            checkNotNull(statsReceiver, "No stats receiver provided.");
-            checkArgument(numReplicas > 0, "Invalid number of replicas : " + numReplicas);
-            return new ConsistentHashRoutingService(new TwitterServerSetWatcher(serverSet, resolveFromName),
-                numReplicas, blackoutSeconds, statsReceiver);
-        }
-    }
-
-    static class ConsistentHash {
-        private final HashFunction hashFunction;
-        private final int numOfReplicas;
-        private final SortedMap<Long, SocketAddress> circle;
-
-        // Stats
-        protected final Counter hostAddedCounter;
-        protected final Counter hostRemovedCounter;
-
-        ConsistentHash(HashFunction hashFunction,
-                       int numOfReplicas,
-                       StatsReceiver statsReceiver) {
-            this.hashFunction = hashFunction;
-            this.numOfReplicas = numOfReplicas;
-            this.circle = new TreeMap<Long, SocketAddress>();
-
-            this.hostAddedCounter = statsReceiver.counter0("adds");
-            this.hostRemovedCounter = statsReceiver.counter0("removes");
-        }
-
-        private String replicaName(int shardId, int replica, String address) {
-            if (shardId < 0) {
-                shardId = UNKNOWN_SHARD_ID;
-            }
-
-            StringBuilder sb = new StringBuilder(100);
-            sb.append("shard-");
-            sb.append(shardId);
-            sb.append('-');
-            sb.append(replica);
-            sb.append('-');
-            sb.append(address);
-
-            return sb.toString();
-        }
-
-        private Long replicaHash(int shardId, int replica, String address) {
-            return hashFunction.hashUnencodedChars(replicaName(shardId, replica, address)).asLong();
-        }
-
-        private Long replicaHash(int shardId, int replica, SocketAddress address) {
-            return replicaHash(shardId, replica, address.toString());
-        }
-
-        public synchronized void add(int shardId, SocketAddress address) {
-            String addressStr = address.toString();
-            for (int i = 0; i < numOfReplicas; i++) {
-                Long hash = replicaHash(shardId, i, addressStr);
-                circle.put(hash, address);
-            }
-            hostAddedCounter.incr();
-        }
-
-        public synchronized void remove(int shardId, SocketAddress address) {
-            for (int i = 0; i < numOfReplicas; i++) {
-                long hash = replicaHash(shardId, i, address);
-                SocketAddress oldAddress = circle.get(hash);
-                if (null != oldAddress && oldAddress.equals(address)) {
-                    circle.remove(hash);
-                }
-            }
-            hostRemovedCounter.incr();
-        }
-
-        public SocketAddress get(String key, RoutingContext rContext) {
-            long hash = hashFunction.hashUnencodedChars(key).asLong();
-            return find(hash, rContext);
-        }
-
-        private synchronized SocketAddress find(long hash, RoutingContext rContext) {
-            if (circle.isEmpty()) {
-                return null;
-            }
-
-            Iterator<Map.Entry<Long, SocketAddress>> iterator =
-                    circle.tailMap(hash).entrySet().iterator();
-            while (iterator.hasNext()) {
-                Map.Entry<Long, SocketAddress> entry = iterator.next();
-                if (!rContext.isTriedHost(entry.getValue())) {
-                    return entry.getValue();
-                }
-            }
-            // the tail map has been checked
-            iterator = circle.headMap(hash).entrySet().iterator();
-            while (iterator.hasNext()) {
-                Map.Entry<Long, SocketAddress> entry = iterator.next();
-                if (!rContext.isTriedHost(entry.getValue())) {
-                    return entry.getValue();
-                }
-            }
-
-            return null;
-        }
-
-        private synchronized Pair<Long, SocketAddress> get(long hash) {
-            if (circle.isEmpty()) {
-                return null;
-            }
-
-            if (!circle.containsKey(hash)) {
-                SortedMap<Long, SocketAddress> tailMap = circle.tailMap(hash);
-                hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
-            }
-            return Pair.of(hash, circle.get(hash));
-        }
-
-        synchronized void dumpHashRing() {
-            for (Map.Entry<Long, SocketAddress> entry : circle.entrySet()) {
-                logger.info(entry.getKey() + " : " + entry.getValue());
-            }
-        }
-
-    }
-
-    class BlackoutHost implements TimerTask {
-        final int shardId;
-        final SocketAddress address;
-
-        BlackoutHost(int shardId, SocketAddress address) {
-            this.shardId = shardId;
-            this.address = address;
-            numBlackoutHosts.incrementAndGet();
-        }
-
-        @Override
-        public void run(Timeout timeout) throws Exception {
-            numBlackoutHosts.decrementAndGet();
-            if (!timeout.isExpired()) {
-                return;
-            }
-            Set<SocketAddress> removedList = new HashSet<SocketAddress>();
-            boolean joined;
-            // add the shard back
-            synchronized (shardId2Address) {
-                SocketAddress curHost = shardId2Address.get(shardId);
-                if (null != curHost) {
-                    // there is already new shard joint, so drop the host.
-                    logger.info("Blackout Shard {} ({}) was already replaced by {} permanently.",
-                            new Object[] { shardId, address, curHost });
-                    joined = false;
-                } else {
-                    join(shardId, address, removedList);
-                    joined = true;
-                }
-            }
-            if (joined) {
-                for (RoutingListener listener : listeners) {
-                    listener.onServerJoin(address);
-                }
-            } else {
-                for (RoutingListener listener : listeners) {
-                    listener.onServerLeft(address);
-                }
-            }
-        }
-    }
-
-    protected final HashedWheelTimer hashedWheelTimer;
-    protected final HashFunction hashFunction = Hashing.md5();
-    protected final ConsistentHash circle;
-    protected final Map<Integer, SocketAddress> shardId2Address =
-            new HashMap<Integer, SocketAddress>();
-    protected final Map<SocketAddress, Integer> address2ShardId =
-            new HashMap<SocketAddress, Integer>();
-
-    // blackout period
-    protected final int blackoutSeconds;
-
-    // stats
-    protected final StatsReceiver statsReceiver;
-    protected final AtomicInteger numBlackoutHosts;
-    protected final Gauge numBlackoutHostsGauge;
-    protected final Gauge numHostsGauge;
-
-    private static final int UNKNOWN_SHARD_ID = -1;
-
-    ConsistentHashRoutingService(ServerSetWatcher serverSetWatcher,
-                                 int numReplicas,
-                                 int blackoutSeconds,
-                                 StatsReceiver statsReceiver) {
-        super(serverSetWatcher);
-        this.circle = new ConsistentHash(hashFunction, numReplicas, statsReceiver.scope("ring"));
-        this.hashedWheelTimer = new HashedWheelTimer(new ThreadFactoryBuilder()
-                .setNameFormat("ConsistentHashRoutingService-Timer-%d").build());
-        this.blackoutSeconds = blackoutSeconds;
-        // stats
-        this.statsReceiver = statsReceiver;
-        this.numBlackoutHosts = new AtomicInteger(0);
-        this.numBlackoutHostsGauge = this.statsReceiver.addGauge(gaugeName("num_blackout_hosts"),
-                new Function0<Object>() {
-                    @Override
-                    public Object apply() {
-                        return (float) numBlackoutHosts.get();
-                    }
-                });
-        this.numHostsGauge = this.statsReceiver.addGauge(gaugeName("num_hosts"),
-                new Function0<Object>() {
-                    @Override
-                    public Object apply() {
-                        return (float) address2ShardId.size();
-                    }
-                });
-    }
-
-    private static Seq<String> gaugeName(String name) {
-        return scala.collection.JavaConversions.asScalaBuffer(Arrays.asList(name)).toList();
-    }
-
-    @Override
-    public void startService() {
-        super.startService();
-        this.hashedWheelTimer.start();
-    }
-
-    @Override
-    public void stopService() {
-        this.hashedWheelTimer.stop();
-        super.stopService();
-    }
-
-    @Override
-    public Set<SocketAddress> getHosts() {
-        synchronized (shardId2Address) {
-            return ImmutableSet.copyOf(address2ShardId.keySet());
-        }
-    }
-
-    @Override
-    public SocketAddress getHost(String key, RoutingContext rContext)
-            throws NoBrokersAvailableException {
-        SocketAddress host = circle.get(key, rContext);
-        if (null != host) {
-            return host;
-        }
-        throw new NoBrokersAvailableException("No host found for " + key + ", routing context : " + rContext);
-    }
-
-    @Override
-    public void removeHost(SocketAddress host, Throwable reason) {
-        removeHostInternal(host, Optional.of(reason));
-    }
-
-    private void removeHostInternal(SocketAddress host, Optional<Throwable> reason) {
-        synchronized (shardId2Address) {
-            Integer shardId = address2ShardId.remove(host);
-            if (null != shardId) {
-                SocketAddress curHost = shardId2Address.get(shardId);
-                if (null != curHost && curHost.equals(host)) {
-                    shardId2Address.remove(shardId);
-                }
-                circle.remove(shardId, host);
-                if (reason.isPresent()) {
-                    if (reason.get() instanceof ChannelException) {
-                        logger.info("Shard {} ({}) left due to ChannelException, black it out for {} seconds"
-                            + " (message = {})",
-                            new Object[] { shardId, host, blackoutSeconds, reason.get().toString() });
-                        BlackoutHost blackoutHost = new BlackoutHost(shardId, host);
-                        hashedWheelTimer.newTimeout(blackoutHost, blackoutSeconds, TimeUnit.SECONDS);
-                    } else {
-                        logger.info("Shard {} ({}) left due to exception {}",
-                                new Object[] { shardId, host, reason.get().toString() });
-                    }
-                } else {
-                    logger.info("Shard {} ({}) left after server set change",
-                                shardId, host);
-                }
-            } else if (reason.isPresent()) {
-                logger.info("Node {} left due to exception {}", host, reason.get().toString());
-            } else {
-                logger.info("Node {} left after server set change", host);
-            }
-        }
-    }
-
-    /**
-     * The caller should synchronize on <i>shardId2Address</i>.
-     * @param shardId
-     *          Shard id of new host joined.
-     * @param newHost
-     *          New host joined.
-     * @param removedList
-     *          Old hosts to remove
-     */
-    private void join(int shardId, SocketAddress newHost, Set<SocketAddress> removedList) {
-        SocketAddress oldHost = shardId2Address.put(shardId, newHost);
-        if (null != oldHost) {
-            // remove the old host only when a new shard is kicked in to replace it.
-            address2ShardId.remove(oldHost);
-            circle.remove(shardId, oldHost);
-            removedList.add(oldHost);
-            logger.info("Shard {} ({}) left permanently.", shardId, oldHost);
-        }
-        address2ShardId.put(newHost, shardId);
-        circle.add(shardId, newHost);
-        logger.info("Shard {} ({}) joined to replace ({}).",
-                    new Object[] { shardId, newHost, oldHost });
-    }
-
-    @Override
-    protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress> serviceInstances) {
-        Set<SocketAddress> joinedList = new HashSet<SocketAddress>();
-        Set<SocketAddress> removedList = new HashSet<SocketAddress>();
-
-        Map<Integer, SocketAddress> newMap = new HashMap<Integer, SocketAddress>();
-        synchronized (shardId2Address) {
-            for (DLSocketAddress serviceInstance : serviceInstances) {
-                if (serviceInstance.getShard() >= 0) {
-                    newMap.put(serviceInstance.getShard(), serviceInstance.getSocketAddress());
-                } else {
-                    Integer shard = address2ShardId.get(serviceInstance.getSocketAddress());
-                    if (null == shard) {
-                        // Assign a random negative shardId
-                        int shardId;
-                        do {
-                            shardId = Math.min(-1 , (int) (Math.random() * Integer.MIN_VALUE));
-                        } while (null != shardId2Address.get(shardId));
-                        shard = shardId;
-                    }
-                    newMap.put(shard, serviceInstance.getSocketAddress());
-                }
-            }
-        }
-
-        Map<Integer, SocketAddress> left;
-        synchronized (shardId2Address) {
-            MapDifference<Integer, SocketAddress> difference =
-                    Maps.difference(shardId2Address, newMap);
-            left = difference.entriesOnlyOnLeft();
-            for (Map.Entry<Integer, SocketAddress> shardEntry : left.entrySet()) {
-                int shard = shardEntry.getKey();
-                if (shard >= 0) {
-                    SocketAddress host = shardId2Address.get(shard);
-                    if (null != host) {
-                        // we don't remove those hosts that just disappered on serverset proactively,
-                        // since it might be just because serverset become flaky
-                        // address2ShardId.remove(host);
-                        // circle.remove(shard, host);
-                        logger.info("Shard {} ({}) left temporarily.", shard, host);
-                    }
-                } else {
-                    // shard id is negative - they are resolved from finagle name, which instances don't have shard id
-                    // in this case, if they are removed from serverset, we removed them directly
-                    SocketAddress host = shardEntry.getValue();
-                    if (null != host) {
-                        removeHostInternal(host, Optional.<Throwable>absent());
-                        removedList.add(host);
-                    }
-                }
-            }
-            // we need to find if any shards are replacing old shards
-            for (Map.Entry<Integer, SocketAddress> shard : newMap.entrySet()) {
-                SocketAddress oldHost = shardId2Address.get(shard.getKey());
-                SocketAddress newHost = shard.getValue();
-                if (!newHost.equals(oldHost)) {
-                    join(shard.getKey(), newHost, removedList);
-                    joinedList.add(newHost);
-                }
-            }
-        }
-
-        for (SocketAddress addr : removedList) {
-            for (RoutingListener listener : listeners) {
-                listener.onServerLeft(addr);
-            }
-        }
-
-        for (SocketAddress addr : joinedList) {
-            for (RoutingListener listener : listeners) {
-                listener.onServerJoin(addr);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java
deleted file mode 100644
index e51eb1e..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.base.Command;
-import com.twitter.common.base.Commands;
-import com.twitter.common.zookeeper.Group;
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.finagle.Addr;
-import com.twitter.finagle.Address;
-import com.twitter.finagle.Name;
-import com.twitter.finagle.Resolver$;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
-import com.twitter.thrift.Status;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-/**
- * Finagle Name based {@link ServerSet} implementation.
- */
-class NameServerSet implements ServerSet {
-
-    private static final Logger logger = LoggerFactory.getLogger(NameServerSet.class);
-
-    private volatile Set<HostChangeMonitor<ServiceInstance>> watchers =
-        new HashSet<HostChangeMonitor<ServiceInstance>>();
-    private volatile ImmutableSet<ServiceInstance> hostSet = ImmutableSet.of();
-    private AtomicBoolean resolutionPending = new AtomicBoolean(true);
-
-    public NameServerSet(String nameStr) {
-        Name name;
-        try {
-            name = Resolver$.MODULE$.eval(nameStr);
-        } catch (Exception exc) {
-            logger.error("Exception in Resolver.eval for name {}", nameStr, exc);
-            // Since this is called from various places that dont handle specific exceptions,
-            // we have no option than to throw a runtime exception to halt the control flow
-            // This should only happen in case of incorrect configuration. Having a log message
-            // would help identify the problem during tests
-            throw new RuntimeException(exc);
-        }
-        initialize(name);
-    }
-
-    public NameServerSet(Name name) {
-        initialize(name);
-    }
-
-    private void initialize(Name name) {
-        if (name instanceof TestName) {
-            ((TestName) name).changes(new AbstractFunction1<Addr, BoxedUnit>() {
-                @Override
-                public BoxedUnit apply(Addr varAddr) {
-                    return NameServerSet.this.respondToChanges(varAddr);
-                }
-            });
-        } else if (name instanceof Name.Bound) {
-            ((Name.Bound) name).addr().changes().respond(new AbstractFunction1<Addr, BoxedUnit>() {
-                @Override
-                public BoxedUnit apply(Addr varAddr) {
-                    return NameServerSet.this.respondToChanges(varAddr);
-                }
-            });
-        } else {
-            logger.error("NameServerSet only supports Name.Bound. While the resolved name {} was {}",
-                name, name.getClass());
-            throw new UnsupportedOperationException("NameServerSet only supports Name.Bound");
-        }
-    }
-
-    private ServiceInstance endpointAddressToServiceInstance(Address endpointAddress) {
-        if (endpointAddress instanceof Address.Inet) {
-            InetSocketAddress inetSocketAddress = ((Address.Inet) endpointAddress).addr();
-            Endpoint endpoint = new Endpoint(inetSocketAddress.getHostString(), inetSocketAddress.getPort());
-            HashMap<String, Endpoint> map = new HashMap<String, Endpoint>();
-            map.put("thrift", endpoint);
-            return new ServiceInstance(
-                endpoint,
-                map,
-                Status.ALIVE);
-        } else {
-            logger.error("We expect InetSocketAddress while the resolved address {} was {}",
-                        endpointAddress, endpointAddress.getClass());
-            throw new UnsupportedOperationException("invalid endpoint address: " + endpointAddress);
-        }
-    }
-
-
-    private BoxedUnit respondToChanges(Addr addr) {
-        ImmutableSet<ServiceInstance> oldHostSet = ImmutableSet.copyOf(hostSet);
-
-        ImmutableSet<ServiceInstance> newHostSet = oldHostSet;
-
-        if (addr instanceof Addr.Bound) {
-            scala.collection.immutable.Set<Address> endpointAddresses = ((Addr.Bound) addr).addrs();
-            scala.collection.Iterator<Address> endpointAddressesIterator = endpointAddresses.toIterator();
-            HashSet<ServiceInstance> serviceInstances = new HashSet<ServiceInstance>();
-            while (endpointAddressesIterator.hasNext()) {
-                serviceInstances.add(endpointAddressToServiceInstance(endpointAddressesIterator.next()));
-            }
-            newHostSet = ImmutableSet.copyOf(serviceInstances);
-
-        } else if (addr instanceof Addr.Failed) {
-            logger.error("Name resolution failed", ((Addr.Failed) addr).cause());
-            newHostSet = ImmutableSet.of();
-        } else if (addr.toString().equals("Pending")) {
-            logger.info("Name resolution pending");
-            newHostSet = oldHostSet;
-        } else if (addr.toString().equals("Neg")) {
-            newHostSet = ImmutableSet.of();
-        } else {
-            logger.error("Invalid Addr type: {}", addr.getClass().getName());
-            throw new UnsupportedOperationException("Invalid Addr type:" + addr.getClass().getName());
-        }
-
-        // Reference comparison is valid as the sets are immutable
-        if (oldHostSet != newHostSet) {
-            logger.info("NameServerSet updated: {} -> {}", hostSetToString(oldHostSet), hostSetToString(newHostSet));
-            resolutionPending.set(false);
-            hostSet = newHostSet;
-            synchronized (watchers) {
-                for (HostChangeMonitor<ServiceInstance> watcher: watchers) {
-                    watcher.onChange(newHostSet);
-                }
-            }
-
-        }
-
-        return BoxedUnit.UNIT;
-    }
-
-
-    private String hostSetToString(ImmutableSet<ServiceInstance> hostSet) {
-        StringBuilder result = new StringBuilder();
-        result.append("(");
-        for (ServiceInstance serviceInstance : hostSet) {
-            Endpoint endpoint = serviceInstance.getServiceEndpoint();
-            result.append(String.format(" %s:%d", endpoint.getHost(), endpoint.getPort()));
-        }
-        result.append(" )");
-
-        return result.toString();
-    }
-
-
-    /**
-     * Attempts to join a server set for this logical service group.
-     *
-     * @param endpoint the primary service endpoint
-     * @param additionalEndpoints and additional endpoints keyed by their logical name
-     * @param status the current service status
-     * @return an EndpointStatus object that allows the endpoint to adjust its status
-     * @throws Group.JoinException if there was a problem joining the server set
-     * @throws InterruptedException if interrupted while waiting to join the server set
-     * @deprecated The status field is deprecated. Please use {@link #join(java.net.InetSocketAddress, java.util.Map)}
-     */
-    @Override
-    public EndpointStatus join(InetSocketAddress endpoint,
-                               Map<String, InetSocketAddress> additionalEndpoints,
-                               Status status)
-            throws Group.JoinException, InterruptedException {
-        throw new UnsupportedOperationException("NameServerSet does not support join");
-    }
-
-    /**
-     * Attempts to join a server set for this logical service group.
-     *
-     * @param endpoint the primary service endpoint
-     * @param additionalEndpoints and additional endpoints keyed by their logical name
-     * @return an EndpointStatus object that allows the endpoint to adjust its status
-     * @throws Group.JoinException if there was a problem joining the server set
-     * @throws InterruptedException if interrupted while waiting to join the server set
-     */
-    @Override
-    public EndpointStatus join(InetSocketAddress endpoint, Map<String, InetSocketAddress> additionalEndpoints)
-            throws Group.JoinException, InterruptedException {
-        throw new UnsupportedOperationException("NameServerSet does not support join");
-    }
-
-    /**
-     * Attempts to join a server set for this logical service group.
-     *
-     * @param endpoint the primary service endpoint
-     * @param additionalEndpoints and additional endpoints keyed by their logical name
-     * @param shardId Unique shard identifier for this member of the service.
-     * @return an EndpointStatus object that allows the endpoint to adjust its status
-     * @throws Group.JoinException if there was a problem joining the server set
-     * @throws InterruptedException if interrupted while waiting to join the server set
-     */
-    @Override
-    public EndpointStatus join(InetSocketAddress endpoint,
-                               Map<String, InetSocketAddress> additionalEndpoints,
-                               int shardId)
-            throws Group.JoinException, InterruptedException {
-        throw new UnsupportedOperationException("NameServerSet does not support join");
-    }
-
-    /**
-     * Registers a monitor to receive change notices for this server set as long as this jvm process
-     * is alive.  Blocks until the initial server set can be gathered and delivered to the monitor.
-     * The monitor will be notified if the membership set or parameters of existing members have
-     * changed.
-     *
-     * @param monitor the server set monitor to call back when the host set changes
-     * @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set
-     * @deprecated Deprecated in favor of {@link #watch(com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor)}
-     */
-    @Deprecated
-    @Override
-    public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
-        throw new UnsupportedOperationException("NameServerSet does not support monitor");
-    }
-
-    /**
-     * Registers a monitor to receive change notices for this server set as long as this jvm process
-     * is alive.  Blocks until the initial server set can be gathered and delivered to the monitor.
-     * The monitor will be notified if the membership set or parameters of existing members have
-     * changed.
-     *
-     * @param monitor the server set monitor to call back when the host set changes
-     * @return A command which, when executed, will stop monitoring the host set.
-     * @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set
-     */
-    @Override
-    public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
-        // First add the monitor to the watchers so that it does not miss any changes and invoke
-        // the onChange method
-        synchronized (watchers) {
-            watchers.add(monitor);
-        }
-
-        if (resolutionPending.compareAndSet(false, false)) {
-            monitor.onChange(hostSet);
-        }
-
-        return Commands.NOOP; // Return value is not used
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java
deleted file mode 100644
index d71cee3..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Sets;
-import org.apache.distributedlog.client.resolver.RegionResolver;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Chain multiple routing services.
- */
-public class RegionsRoutingService implements RoutingService {
-
-    private static final Logger logger = LoggerFactory.getLogger(RegionsRoutingService.class);
-
-    /**
-     * Create a multiple regions routing services based on a list of region routing {@code services}.
-     *
-     * <p>It is deprecated. Please use {@link Builder} to build multiple regions routing service.
-     *
-     * @param regionResolver region resolver
-     * @param services a list of region routing services.
-     * @return multiple regions routing service
-     * @see Builder
-     */
-    @Deprecated
-    public static RegionsRoutingService of(RegionResolver regionResolver,
-                                         RoutingService...services) {
-        return new RegionsRoutingService(regionResolver, services);
-    }
-
-    /**
-     * Create a builder to build a multiple-regions routing service.
-     *
-     * @return builder to build a multiple-regions routing service.
-     */
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /**
-     * Builder to build a multiple-regions routing service.
-     */
-    public static class Builder implements RoutingService.Builder {
-
-        private RegionResolver resolver;
-        private RoutingService.Builder[] routingServiceBuilders;
-        private StatsReceiver statsReceiver = NullStatsReceiver.get();
-
-        private Builder() {}
-
-        public Builder routingServiceBuilders(RoutingService.Builder...builders) {
-            this.routingServiceBuilders = builders;
-            return this;
-        }
-
-        public Builder resolver(RegionResolver regionResolver) {
-            this.resolver = regionResolver;
-            return this;
-        }
-
-        @Override
-        public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) {
-            this.statsReceiver = statsReceiver;
-            return this;
-        }
-
-        @Override
-        public RegionsRoutingService build() {
-            checkNotNull(routingServiceBuilders, "No routing service builder provided.");
-            checkNotNull(resolver, "No region resolver provided.");
-            checkNotNull(statsReceiver, "No stats receiver provided");
-            RoutingService[] services = new RoutingService[routingServiceBuilders.length];
-            for (int i = 0; i < services.length; i++) {
-                String statsScope;
-                if (0 == i) {
-                    statsScope = "local";
-                } else {
-                    statsScope = "remote_" + i;
-                }
-                services[i] = routingServiceBuilders[i]
-                        .statsReceiver(statsReceiver.scope(statsScope))
-                        .build();
-            }
-            return new RegionsRoutingService(resolver, services);
-        }
-    }
-
-    protected final RegionResolver regionResolver;
-    protected final RoutingService[] routingServices;
-
-    private RegionsRoutingService(RegionResolver resolver,
-                                  RoutingService[] routingServices) {
-        this.regionResolver = resolver;
-        this.routingServices = routingServices;
-    }
-
-    @Override
-    public Set<SocketAddress> getHosts() {
-        Set<SocketAddress> hosts = Sets.newHashSet();
-        for (RoutingService rs : routingServices) {
-            hosts.addAll(rs.getHosts());
-        }
-        return hosts;
-    }
-
-    @Override
-    public void startService() {
-        for (RoutingService service : routingServices) {
-            service.startService();
-        }
-        logger.info("Regions Routing Service Started");
-    }
-
-    @Override
-    public void stopService() {
-        for (RoutingService service : routingServices) {
-            service.stopService();
-        }
-        logger.info("Regions Routing Service Stopped");
-    }
-
-    @Override
-    public RoutingService registerListener(RoutingListener listener) {
-        for (RoutingService service : routingServices) {
-            service.registerListener(listener);
-        }
-        return this;
-    }
-
-    @Override
-    public RoutingService unregisterListener(RoutingListener listener) {
-        for (RoutingService service : routingServices) {
-            service.registerListener(listener);
-        }
-        return this;
-    }
-
-    @Override
-    public SocketAddress getHost(String key, RoutingContext routingContext)
-            throws NoBrokersAvailableException {
-        for (RoutingService service : routingServices) {
-            try {
-                SocketAddress addr = service.getHost(key, routingContext);
-                if (routingContext.hasUnavailableRegions()) {
-                    // current region is unavailable
-                    String region = regionResolver.resolveRegion(addr);
-                    if (routingContext.isUnavailableRegion(region)) {
-                        continue;
-                    }
-                }
-                if (!routingContext.isTriedHost(addr)) {
-                    return addr;
-                }
-            } catch (NoBrokersAvailableException nbae) {
-                // if there isn't broker available in current service, try next service.
-                logger.debug("No brokers available in region {} : ", service, nbae);
-            }
-        }
-        throw new NoBrokersAvailableException("No host found for " + key + ", routing context : " + routingContext);
-    }
-
-    @Override
-    public void removeHost(SocketAddress address, Throwable reason) {
-        for (RoutingService service : routingServices) {
-            service.removeHost(address, reason);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java
deleted file mode 100644
index ad73c17..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import org.apache.distributedlog.client.resolver.RegionResolver;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Routing Service provides mechanism how to route requests.
- */
-public interface RoutingService {
-
-    /**
-     * Builder to build routing service.
-     */
-    interface Builder {
-
-        /**
-         * Build routing service with stats receiver.
-         *
-         * @param statsReceiver
-         *          stats receiver
-         * @return built routing service
-         */
-        Builder statsReceiver(StatsReceiver statsReceiver);
-
-        /**
-         * Build the routing service.
-         *
-         * @return built routing service
-         */
-        RoutingService build();
-
-    }
-
-    /**
-     * Listener for server changes on routing service.
-     */
-    interface RoutingListener {
-        /**
-         * Trigger when server left.
-         *
-         * @param address left server.
-         */
-        void onServerLeft(SocketAddress address);
-
-        /**
-         * Trigger when server joint.
-         *
-         * @param address joint server.
-         */
-        void onServerJoin(SocketAddress address);
-    }
-
-    /**
-     * Routing Context of a request.
-     */
-    class RoutingContext {
-
-        public static RoutingContext of(RegionResolver resolver) {
-            return new RoutingContext(resolver);
-        }
-
-        final RegionResolver regionResolver;
-        final Map<SocketAddress, StatusCode> triedHosts;
-        final Set<String> unavailableRegions;
-
-        private RoutingContext(RegionResolver regionResolver) {
-            this.regionResolver = regionResolver;
-            this.triedHosts = new HashMap<SocketAddress, StatusCode>();
-            this.unavailableRegions = new HashSet<String>();
-        }
-
-        @Override
-        public synchronized String toString() {
-            return "(tried hosts=" + triedHosts + ")";
-        }
-
-        /**
-         * Add tried host to routing context.
-         *
-         * @param socketAddress
-         *          socket address of tried host.
-         * @param code
-         *          status code returned from tried host.
-         * @return routing context.
-         */
-        public synchronized RoutingContext addTriedHost(SocketAddress socketAddress, StatusCode code) {
-            this.triedHosts.put(socketAddress, code);
-            if (StatusCode.REGION_UNAVAILABLE == code) {
-                unavailableRegions.add(regionResolver.resolveRegion(socketAddress));
-            }
-            return this;
-        }
-
-        /**
-         * Is the host <i>address</i> already tried.
-         *
-         * @param address
-         *          socket address to check
-         * @return true if the address is already tried, otherwise false.
-         */
-        public synchronized boolean isTriedHost(SocketAddress address) {
-            return this.triedHosts.containsKey(address);
-        }
-
-        /**
-         * Whether encountered unavailable regions.
-         *
-         * @return true if encountered unavailable regions, otherwise false.
-         */
-        public synchronized boolean hasUnavailableRegions() {
-            return !unavailableRegions.isEmpty();
-        }
-
-        /**
-         * Whether the <i>region</i> is unavailable.
-         *
-         * @param region
-         *          region
-         * @return true if the region is unavailable, otherwise false.
-         */
-        public synchronized boolean isUnavailableRegion(String region) {
-            return unavailableRegions.contains(region);
-        }
-
-    }
-
-    /**
-     * Start routing service.
-     */
-    void startService();
-
-    /**
-     * Stop routing service.
-     */
-    void stopService();
-
-    /**
-     * Register routing listener.
-     *
-     * @param listener routing listener.
-     * @return routing service.
-     */
-    RoutingService registerListener(RoutingListener listener);
-
-    /**
-     * Unregister routing listener.
-     *
-     * @param listener routing listener.
-     * @return routing service.
-     */
-    RoutingService unregisterListener(RoutingListener listener);
-
-    /**
-     * Get all the hosts that available in routing service.
-     *
-     * @return all the hosts
-     */
-    Set<SocketAddress> getHosts();
-
-    /**
-     * Get the host to route the request by <i>key</i>.
-     *
-     * @param key
-     *          key to route the request.
-     * @param rContext
-     *          routing context.
-     * @return host to route the request
-     * @throws NoBrokersAvailableException
-     */
-    SocketAddress getHost(String key, RoutingContext rContext)
-            throws NoBrokersAvailableException;
-
-    /**
-     * Remove the host <i>address</i> for a specific <i>reason</i>.
-     *
-     * @param address
-     *          host address to remove
-     * @param reason
-     *          reason to remove the host
-     */
-    void removeHost(SocketAddress address, Throwable reason);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java
deleted file mode 100644
index 4ac22ce..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import com.twitter.finagle.stats.StatsReceiver;
-
-class RoutingServiceProvider implements RoutingService.Builder {
-
-    final RoutingService routingService;
-
-    RoutingServiceProvider(RoutingService routingService) {
-        this.routingService = routingService;
-    }
-
-    @Override
-    public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) {
-        return this;
-    }
-
-    @Override
-    public RoutingService build() {
-        return routingService;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java
deleted file mode 100644
index 8e8edd3..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import com.twitter.common.zookeeper.ServerSet;
-import java.net.SocketAddress;
-
-/**
- * Utils for routing services.
- */
-public class RoutingUtils {
-
-    private static final int NUM_CONSISTENT_HASH_REPLICAS = 997;
-
-    /**
-     * Building routing service from <code>finagleNameStr</code>.
-     *
-     * @param finagleNameStr
-     *          finagle name str of a service
-     * @return routing service builder
-     */
-    public static RoutingService.Builder buildRoutingService(String finagleNameStr) {
-        if (!finagleNameStr.startsWith("serverset!")
-                && !finagleNameStr.startsWith("inet!")
-                && !finagleNameStr.startsWith("zk!")) {
-            // We only support serverset based names at the moment
-            throw new UnsupportedOperationException("Finagle Name format not supported for name: " + finagleNameStr);
-        }
-        return buildRoutingService(new NameServerSet(finagleNameStr), true);
-    }
-
-    /**
-     * Building routing service from <code>serverSet</code>.
-     *
-     * @param serverSet
-     *          server set of a service
-     * @return routing service builder
-     */
-    public static RoutingService.Builder buildRoutingService(ServerSet serverSet) {
-        return buildRoutingService(serverSet, false);
-    }
-
-    /**
-     * Building routing service from <code>address</code>.
-     *
-     * @param address
-     *          host to route the requests
-     * @return routing service builder
-     */
-    public static RoutingService.Builder buildRoutingService(SocketAddress address) {
-        return SingleHostRoutingService.newBuilder().address(address);
-    }
-
-    /**
-     * Build routing service builder of a routing service <code>routingService</code>.
-     *
-     * @param routingService
-     *          routing service to provide
-     * @return routing service builder
-     */
-    public static RoutingService.Builder buildRoutingService(RoutingService routingService) {
-        return new RoutingServiceProvider(routingService);
-    }
-
-    private static RoutingService.Builder buildRoutingService(ServerSet serverSet,
-                                                              boolean resolveFromName) {
-        return ConsistentHashRoutingService.newBuilder()
-                .serverSet(serverSet)
-                .resolveFromName(resolveFromName)
-                .numReplicas(NUM_CONSISTENT_HASH_REPLICAS);
-    }
-
-}


Mime
View raw message