distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [39/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace
Date Thu, 05 Jan 2017 00:51:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java
new file mode 100644
index 0000000..d7494de
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.LogRecordSet;
+import org.apache.distributedlog.LogRecordSetBuffer;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.service.DistributedLogClient;
+import com.twitter.finagle.IndividualRequestTimeoutException;
+import com.twitter.util.Await;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test {@link DistributedLogMultiStreamWriter}.
+ */
+public class TestDistributedLogMultiStreamWriter {
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testBuildWithNullStreams() throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testBuildWithEmptyStreamList() throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.<String>newArrayList())
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = NullPointerException.class)
+    public void testBuildWithNullClient() throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = NullPointerException.class)
+    public void testBuildWithNullCodec() throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(null)
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testBuildWithInvalidSpeculativeSettings1()
+            throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(-1)
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testBuildWithInvalidSpeculativeSettings2()
+            throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(5)
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testBuildWithInvalidSpeculativeSettings3()
+            throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(-1)
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testBuildWithInvalidSpeculativeSettings4()
+            throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(10)
+                .build();
+    }
+
+    @Test(timeout = 20000)
+    public void testBuildMultiStreamWriter()
+            throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(50)
+                .build();
+        assertTrue(true);
+    }
+
+    @Test(timeout = 20000)
+    public void testBuildWithPeriodicalFlushEnabled() throws Exception {
+        ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
+        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(50)
+                .flushIntervalMs(1000)
+                .scheduler(executorService)
+                .build();
+        verify(executorService, times(1)).scheduleAtFixedRate(writer, 1000000, 1000000, TimeUnit.MICROSECONDS);
+    }
+
+    @Test(timeout = 20000)
+    public void testBuildWithPeriodicalFlushDisabled() throws Exception {
+        ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
+        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(50)
+                .flushIntervalMs(0)
+                .scheduler(executorService)
+                .build();
+        verify(executorService, times(0)).scheduleAtFixedRate(writer, 1000, 1000, TimeUnit.MILLISECONDS);
+        writer.close();
+    }
+
+    @Test(timeout = 20000)
+    public void testFlushWhenBufferIsFull() throws Exception {
+        DistributedLogClient client = mock(DistributedLogClient.class);
+        when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
+                .thenReturn(Future.value(new DLSN(1L, 1L, 999L)));
+
+        ScheduledExecutorService executorService =
+                Executors.newSingleThreadScheduledExecutor();
+        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(client)
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(100000)
+                .maxSpeculativeTimeoutMs(200000)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(500000)
+                .flushIntervalMs(0)
+                .bufferSize(0)
+                .scheduler(executorService)
+                .build();
+
+        ByteBuffer buffer = ByteBuffer.wrap("test".getBytes(UTF_8));
+        writer.write(buffer);
+
+        verify(client, times(1)).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
+
+        writer.close();
+    }
+
+    @Test(timeout = 20000)
+    public void testFlushWhenExceedMaxLogRecordSetSize()
+            throws Exception {
+        DistributedLogClient client = mock(DistributedLogClient.class);
+        when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
+                .thenReturn(Future.value(new DLSN(1L, 1L, 999L)));
+        ScheduledExecutorService executorService =
+                Executors.newSingleThreadScheduledExecutor();
+        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(client)
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(100000)
+                .maxSpeculativeTimeoutMs(200000)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(500000)
+                .flushIntervalMs(0)
+                .bufferSize(Integer.MAX_VALUE)
+                .scheduler(executorService)
+                .build();
+
+        byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE - 3 * 100];
+        ByteBuffer buffer1 = ByteBuffer.wrap(data);
+        writer.write(buffer1);
+        verify(client, times(0)).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
+        LogRecordSet.Writer recordSetWriter1 = writer.getLogRecordSetWriter();
+        assertEquals(1, recordSetWriter1.getNumRecords());
+        assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, recordSetWriter1.getNumBytes());
+
+        ByteBuffer buffer2 = ByteBuffer.wrap(data);
+        writer.write(buffer2);
+        verify(client, times(1)).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
+        LogRecordSet.Writer recordSetWriter2 = writer.getLogRecordSetWriter();
+        assertEquals(1, recordSetWriter2.getNumRecords());
+        assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, recordSetWriter2.getNumBytes());
+        assertTrue(recordSetWriter1 != recordSetWriter2);
+
+        writer.close();
+    }
+
+    @Test(timeout = 20000)
+    public void testWriteTooLargeRecord() throws Exception {
+        DistributedLogClient client = mock(DistributedLogClient.class);
+        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(client)
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(100000)
+                .maxSpeculativeTimeoutMs(200000)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(5000000)
+                .flushIntervalMs(0)
+                .bufferSize(0)
+                .build();
+
+        byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE + 10];
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        Future<DLSN> writeFuture = writer.write(buffer);
+        assertTrue(writeFuture.isDefined());
+        try {
+            Await.result(writeFuture);
+            fail("Should fail on writing too long record");
+        } catch (LogRecordTooLongException lrtle) {
+            // expected
+        }
+        writer.close();
+    }
+
+    @Test(timeout = 20000)
+    public void testSpeculativeWrite() throws Exception {
+        DistributedLogClient client = mock(DistributedLogClient.class);
+        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(client)
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(5000000)
+                .flushIntervalMs(0)
+                .bufferSize(0)
+                .build();
+
+        final String secondStream = writer.getStream(1);
+
+        final DLSN dlsn = new DLSN(99L, 88L, 0L);
+
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                Object[] arguments = invocation.getArguments();
+                String stream = (String) arguments[0];
+                if (stream.equals(secondStream)) {
+                    return Future.value(dlsn);
+                } else {
+                    return new Promise<DLSN>();
+                }
+            }
+        }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
+
+        byte[] data = "test-test".getBytes(UTF_8);
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        Future<DLSN> writeFuture = writer.write(buffer);
+        DLSN writeDLSN = Await.result(writeFuture);
+        assertEquals(dlsn, writeDLSN);
+        writer.close();
+    }
+
+    @Test(timeout = 20000)
+    public void testPeriodicalFlush() throws Exception {
+        DistributedLogClient client = mock(DistributedLogClient.class);
+        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(client)
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(5000000)
+                .flushIntervalMs(10)
+                .bufferSize(Integer.MAX_VALUE)
+                .build();
+
+        final DLSN dlsn = new DLSN(99L, 88L, 0L);
+
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                return Future.value(dlsn);
+            }
+        }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
+
+        byte[] data = "test-test".getBytes(UTF_8);
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        Future<DLSN> writeFuture = writer.write(buffer);
+        DLSN writeDLSN = Await.result(writeFuture);
+        assertEquals(dlsn, writeDLSN);
+        writer.close();
+    }
+
+    @Test(timeout = 20000)
+    public void testFailRequestAfterRetriedAllStreams() throws Exception {
+        DistributedLogClient client = mock(DistributedLogClient.class);
+        when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
+                .thenReturn(new Promise<DLSN>());
+        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(client)
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(5000000)
+                .flushIntervalMs(10)
+                .bufferSize(Integer.MAX_VALUE)
+                .build();
+
+        byte[] data = "test-test".getBytes(UTF_8);
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        Future<DLSN> writeFuture = writer.write(buffer);
+        try {
+            Await.result(writeFuture);
+            fail("Should fail the request after retries all streams");
+        } catch (IndividualRequestTimeoutException e) {
+            long timeoutMs = e.timeout().inMilliseconds();
+            assertTrue(timeoutMs >= (10 + 20) && timeoutMs < 5000000);
+        }
+        writer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
new file mode 100644
index 0000000..86d1c11
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.ownership;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.distributedlog.client.ClientConfig;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test Case for Ownership Cache.
+ */
+public class TestOwnershipCache {
+
+    @Rule
+    public TestName runtime = new TestName();
+
+    private static OwnershipCache createOwnershipCache() {
+        ClientConfig clientConfig = new ClientConfig();
+        return new OwnershipCache(clientConfig, null,
+                                  NullStatsReceiver.get(), NullStatsReceiver.get());
+    }
+
+    private static SocketAddress createSocketAddress(int port) {
+        return new InetSocketAddress("127.0.0.1", port);
+    }
+
+    @Test(timeout = 60000)
+    public void testUpdateOwner() {
+        OwnershipCache cache = createOwnershipCache();
+        SocketAddress addr = createSocketAddress(1000);
+        String stream = runtime.getMethodName();
+
+        assertTrue("Should successfully update owner if no owner exists before",
+                cache.updateOwner(stream, addr));
+        assertEquals("Owner should be " + addr + " for stream " + stream,
+                addr, cache.getOwner(stream));
+        assertTrue("Should successfully update owner if old owner is same",
+                cache.updateOwner(stream, addr));
+        assertEquals("Owner should be " + addr + " for stream " + stream,
+                addr, cache.getOwner(stream));
+    }
+
+    @Test(timeout = 60000)
+    public void testRemoveOwnerFromStream() {
+        OwnershipCache cache = createOwnershipCache();
+        int initialPort = 2000;
+        int numProxies = 2;
+        int numStreamsPerProxy = 2;
+        for (int i = 0; i < numProxies; i++) {
+            SocketAddress addr = createSocketAddress(initialPort + i);
+            for (int j = 0; j < numStreamsPerProxy; j++) {
+                String stream = runtime.getMethodName() + "_" + i + "_" + j;
+                cache.updateOwner(stream, addr);
+            }
+        }
+        Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+                numProxies * numStreamsPerProxy, ownershipMap.size());
+        Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should be " + numProxies + " proxies cached",
+                numProxies, ownershipDistribution.size());
+
+        String stream = runtime.getMethodName() + "_0_0";
+        SocketAddress owner = createSocketAddress(initialPort);
+
+        // remove non-existent mapping won't change anything
+        SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999);
+        cache.removeOwnerFromStream(stream, nonExistentAddr, "remove-non-existent-addr");
+        assertEquals("Owner " + owner + " should not be removed",
+                owner, cache.getOwner(stream));
+        ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+                numProxies * numStreamsPerProxy, ownershipMap.size());
+
+        // remove existent mapping should remove ownership mapping
+        cache.removeOwnerFromStream(stream, owner, "remove-owner");
+        assertNull("Owner " + owner + " should be removed", cache.getOwner(stream));
+        ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + (numProxies * numStreamsPerProxy - 1) + " entries left in cache",
+                numProxies * numStreamsPerProxy - 1, ownershipMap.size());
+        ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should still be " + numProxies + " proxies cached",
+                numProxies, ownershipDistribution.size());
+        Set<String> ownedStreams = ownershipDistribution.get(owner);
+        assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned for " + owner,
+                numStreamsPerProxy - 1, ownedStreams.size());
+        assertFalse("Stream " + stream + " should not be owned by " + owner,
+                ownedStreams.contains(stream));
+    }
+
+    @Test(timeout = 60000)
+    public void testRemoveAllStreamsFromOwner() {
+        OwnershipCache cache = createOwnershipCache();
+        int initialPort = 2000;
+        int numProxies = 2;
+        int numStreamsPerProxy = 2;
+        for (int i = 0; i < numProxies; i++) {
+            SocketAddress addr = createSocketAddress(initialPort + i);
+            for (int j = 0; j < numStreamsPerProxy; j++) {
+                String stream = runtime.getMethodName() + "_" + i + "_" + j;
+                cache.updateOwner(stream, addr);
+            }
+        }
+        Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+                numProxies * numStreamsPerProxy, ownershipMap.size());
+        Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should be " + numProxies + " proxies cached",
+                numProxies, ownershipDistribution.size());
+
+        SocketAddress owner = createSocketAddress(initialPort);
+
+        // remove non-existent host won't change anything
+        SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999);
+        cache.removeAllStreamsFromOwner(nonExistentAddr);
+        ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should still be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+                numProxies * numStreamsPerProxy, ownershipMap.size());
+        ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should still be " + numProxies + " proxies cached",
+                numProxies, ownershipDistribution.size());
+
+        // remove existent host should remove ownership mapping
+        cache.removeAllStreamsFromOwner(owner);
+        ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + ((numProxies - 1) * numStreamsPerProxy) + " entries left in cache",
+                (numProxies - 1) * numStreamsPerProxy, ownershipMap.size());
+        ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should be " + (numProxies - 1) + " proxies cached",
+                numProxies - 1, ownershipDistribution.size());
+        assertFalse("Host " + owner + " should not be cached",
+                ownershipDistribution.containsKey(owner));
+    }
+
+    @Test(timeout = 60000)
+    public void testReplaceOwner() {
+        OwnershipCache cache = createOwnershipCache();
+        int initialPort = 2000;
+        int numProxies = 2;
+        int numStreamsPerProxy = 2;
+        for (int i = 0; i < numProxies; i++) {
+            SocketAddress addr = createSocketAddress(initialPort + i);
+            for (int j = 0; j < numStreamsPerProxy; j++) {
+                String stream = runtime.getMethodName() + "_" + i + "_" + j;
+                cache.updateOwner(stream, addr);
+            }
+        }
+        Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+                numProxies * numStreamsPerProxy, ownershipMap.size());
+        Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should be " + numProxies + " proxies cached",
+                numProxies, ownershipDistribution.size());
+
+        String stream = runtime.getMethodName() + "_0_0";
+        SocketAddress oldOwner = createSocketAddress(initialPort);
+        SocketAddress newOwner = createSocketAddress(initialPort + 999);
+
+        cache.updateOwner(stream, newOwner);
+        assertEquals("Owner of " + stream + " should be changed from " + oldOwner + " to " + newOwner,
+                newOwner, cache.getOwner(stream));
+        ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+                numProxies * numStreamsPerProxy, ownershipMap.size());
+        assertEquals("Owner of " + stream + " should be " + newOwner,
+                newOwner, ownershipMap.get(stream));
+        ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should be " + (numProxies + 1) + " proxies cached",
+                numProxies + 1, ownershipDistribution.size());
+        Set<String> oldOwnedStreams = ownershipDistribution.get(oldOwner);
+        assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + oldOwner,
+                numStreamsPerProxy - 1, oldOwnedStreams.size());
+        assertFalse("Stream " + stream + " should not be owned by " + oldOwner,
+                oldOwnedStreams.contains(stream));
+        Set<String> newOwnedStreams = ownershipDistribution.get(newOwner);
+        assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + newOwner,
+                1, newOwnedStreams.size());
+        assertTrue("Stream " + stream + " should be owned by " + newOwner,
+                newOwnedStreams.contains(stream));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
new file mode 100644
index 0000000..8ef33bd
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import org.apache.distributedlog.thrift.service.BulkWriteResponse;
+import org.apache.distributedlog.thrift.service.ClientInfo;
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import org.apache.distributedlog.thrift.service.HeartbeatOptions;
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import org.apache.distributedlog.thrift.service.WriteContext;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import com.twitter.util.Future;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Mock DistributedLog Related Services.
+ */
+public class MockDistributedLogServices {
+
+    /**
+     * Mock basic service.
+     */
+    static class MockBasicService implements DistributedLogService.ServiceIface {
+
+        @Override
+        public Future<ServerInfo> handshake() {
+            return Future.value(new ServerInfo());
+        }
+
+        @Override
+        public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
+            return Future.value(new ServerInfo());
+        }
+
+        @Override
+        public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> heartbeatWithOptions(String stream,
+                                                          WriteContext ctx,
+                                                          HeartbeatOptions options) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> write(String stream,
+                                           ByteBuffer data) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> writeWithContext(String stream,
+                                                      ByteBuffer data,
+                                                      WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<BulkWriteResponse> writeBulkWithContext(String stream,
+                                                              List<ByteBuffer> data,
+                                                              WriteContext ctx) {
+            return Future.value(new BulkWriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> truncate(String stream,
+                                              String dlsn,
+                                              WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> release(String stream,
+                                             WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> create(String stream, WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> delete(String stream,
+                                            WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> getOwner(String stream, WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<Void> setAcceptNewStream(boolean enabled) {
+            return Future.value(null);
+        }
+    }
+
+    /**
+     * Mock server info service.
+     */
+    public static class MockServerInfoService extends MockBasicService {
+
+        protected ServerInfo serverInfo;
+
+        public MockServerInfoService() {
+            serverInfo = new ServerInfo();
+        }
+
+        public void updateServerInfo(ServerInfo serverInfo) {
+            this.serverInfo = serverInfo;
+        }
+
+        @Override
+        public Future<ServerInfo> handshake() {
+            return Future.value(serverInfo);
+        }
+
+        @Override
+        public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
+            return Future.value(serverInfo);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
new file mode 100644
index 0000000..e38c2ed
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Mock Proxy Client Builder.
+ */
+class MockProxyClientBuilder implements ProxyClient.Builder {
+
+    static class MockProxyClient extends ProxyClient {
+        MockProxyClient(SocketAddress address,
+                        DistributedLogService.ServiceIface service) {
+            super(address, new MockThriftClient(), service);
+        }
+    }
+
+    private final ConcurrentMap<SocketAddress, MockProxyClient> clients =
+            new ConcurrentHashMap<SocketAddress, MockProxyClient>();
+
+    public void provideProxyClient(SocketAddress address,
+                                   MockProxyClient proxyClient) {
+        clients.put(address, proxyClient);
+    }
+
+    @Override
+    public ProxyClient build(SocketAddress address) {
+        return clients.get(address);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
new file mode 100644
index 0000000..ad1c878
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import com.twitter.finagle.Service;
+import com.twitter.finagle.thrift.ThriftClientRequest;
+import com.twitter.util.Future;
+
+/**
+ * Mock Thrift Client.
+ */
+class MockThriftClient extends Service<ThriftClientRequest, byte[]> {
+    @Override
+    public Future<byte[]> apply(ThriftClientRequest request) {
+        return Future.value(request.message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
new file mode 100644
index 0000000..6d9a471
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.distributedlog.client.ClientConfig;
+import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockBasicService;
+import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockServerInfoService;
+import org.apache.distributedlog.client.proxy.MockProxyClientBuilder.MockProxyClient;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.client.stats.ClientStats;
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.tuple.Pair;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test Proxy Client Manager.
+ */
+public class TestProxyClientManager {
+
+    @Rule
+    public TestName runtime = new TestName();
+
+    static class TestHostProvider implements HostProvider {
+
+        Set<SocketAddress> hosts = new HashSet<SocketAddress>();
+
+        synchronized void addHost(SocketAddress host) {
+            hosts.add(host);
+        }
+
+        @Override
+        public synchronized Set<SocketAddress> getHosts() {
+            return ImmutableSet.copyOf(hosts);
+        }
+
+    }
+
+    private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder,
+                                                               long periodicHandshakeIntervalMs) {
+        HostProvider provider = new TestHostProvider();
+        return createProxyClientManager(builder, provider, periodicHandshakeIntervalMs);
+    }
+
+    private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder,
+                                                               HostProvider hostProvider,
+                                                               long periodicHandshakeIntervalMs) {
+        ClientConfig clientConfig = new ClientConfig();
+        clientConfig.setPeriodicHandshakeIntervalMs(periodicHandshakeIntervalMs);
+        clientConfig.setPeriodicOwnershipSyncIntervalMs(-1);
+        HashedWheelTimer dlTimer = new HashedWheelTimer(
+                new ThreadFactoryBuilder().setNameFormat("TestProxyClientManager-timer-%d").build(),
+                clientConfig.getRedirectBackoffStartMs(),
+                TimeUnit.MILLISECONDS);
+        return new ProxyClientManager(clientConfig, builder, dlTimer, hostProvider,
+                new ClientStats(NullStatsReceiver.get(), false, new DefaultRegionResolver()));
+    }
+
+    private static SocketAddress createSocketAddress(int port) {
+        return new InetSocketAddress("127.0.0.1", port);
+    }
+
+    private static MockProxyClient createMockProxyClient(SocketAddress address) {
+        return new MockProxyClient(address, new MockBasicService());
+    }
+
+    private static Pair<MockProxyClient, MockServerInfoService> createMockProxyClient(
+            SocketAddress address, ServerInfo serverInfo) {
+        MockServerInfoService service = new MockServerInfoService();
+        MockProxyClient proxyClient = new MockProxyClient(address, service);
+        service.updateServerInfo(serverInfo);
+        return Pair.of(proxyClient, service);
+    }
+
+    @Test(timeout = 60000)
+    public void testBasicCreateRemove() throws Exception {
+        SocketAddress address = createSocketAddress(1000);
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        MockProxyClient mockProxyClient = createMockProxyClient(address);
+        builder.provideProxyClient(address, mockProxyClient);
+
+        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        ProxyClient proxyClient =  clientManager.createClient(address);
+        assertEquals("Create client should build the proxy client",
+                1, clientManager.getNumProxies());
+        assertTrue("The client returned should be the same client that builder built",
+                mockProxyClient == proxyClient);
+    }
+
+    @Test(timeout = 60000)
+    public void testGetShouldCreateClient() throws Exception {
+        SocketAddress address = createSocketAddress(2000);
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        MockProxyClient mockProxyClient = createMockProxyClient(address);
+        builder.provideProxyClient(address, mockProxyClient);
+
+        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        ProxyClient proxyClient =  clientManager.getClient(address);
+        assertEquals("Get client should build the proxy client",
+                1, clientManager.getNumProxies());
+        assertTrue("The client returned should be the same client that builder built",
+                mockProxyClient == proxyClient);
+    }
+
+    @Test(timeout = 60000)
+    public void testConditionalRemoveClient() throws Exception {
+        SocketAddress address = createSocketAddress(3000);
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        MockProxyClient mockProxyClient = createMockProxyClient(address);
+        MockProxyClient anotherMockProxyClient = createMockProxyClient(address);
+        builder.provideProxyClient(address, mockProxyClient);
+
+        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        clientManager.createClient(address);
+        assertEquals("Create client should build the proxy client",
+                1, clientManager.getNumProxies());
+        clientManager.removeClient(address, anotherMockProxyClient);
+        assertEquals("Conditional remove should not remove proxy client",
+                1, clientManager.getNumProxies());
+        clientManager.removeClient(address, mockProxyClient);
+        assertEquals("Conditional remove should remove proxy client",
+                0, clientManager.getNumProxies());
+    }
+
+    @Test(timeout = 60000)
+    public void testRemoveClient() throws Exception {
+        SocketAddress address = createSocketAddress(3000);
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        MockProxyClient mockProxyClient = createMockProxyClient(address);
+        builder.provideProxyClient(address, mockProxyClient);
+
+        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        clientManager.createClient(address);
+        assertEquals("Create client should build the proxy client",
+                1, clientManager.getNumProxies());
+        clientManager.removeClient(address);
+        assertEquals("Remove should remove proxy client",
+                0, clientManager.getNumProxies());
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateClientShouldHandshake() throws Exception {
+        SocketAddress address = createSocketAddress(3000);
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        ServerInfo serverInfo = new ServerInfo();
+        serverInfo.putToOwnerships(runtime.getMethodName() + "_stream",
+                runtime.getMethodName() + "_owner");
+        Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
+                createMockProxyClient(address, serverInfo);
+        builder.provideProxyClient(address, mockProxyClient.getLeft());
+
+        final AtomicReference<ServerInfo> resultHolder = new AtomicReference<ServerInfo>(null);
+        final CountDownLatch doneLatch = new CountDownLatch(1);
+        ProxyListener listener = new ProxyListener() {
+            @Override
+            public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
+                resultHolder.set(serverInfo);
+                doneLatch.countDown();
+            }
+            @Override
+            public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
+            }
+        };
+
+        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+        clientManager.registerProxyListener(listener);
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        clientManager.createClient(address);
+        assertEquals("Create client should build the proxy client",
+                1, clientManager.getNumProxies());
+
+        // When a client is created, it would handshake with that proxy
+        doneLatch.await();
+        assertEquals("Handshake should return server info",
+                serverInfo, resultHolder.get());
+    }
+
+    @Test(timeout = 60000)
+    public void testHandshake() throws Exception {
+        final int numHosts = 3;
+        final int numStreamsPerHost = 3;
+        final int initialPort = 4000;
+
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        Map<SocketAddress, ServerInfo> serverInfoMap =
+                new HashMap<SocketAddress, ServerInfo>();
+        for (int i = 0; i < numHosts; i++) {
+            SocketAddress address = createSocketAddress(initialPort + i);
+            ServerInfo serverInfo = new ServerInfo();
+            for (int j = 0; j < numStreamsPerHost; j++) {
+                serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j,
+                        address.toString());
+            }
+            Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
+                    createMockProxyClient(address, serverInfo);
+            builder.provideProxyClient(address, mockProxyClient.getLeft());
+            serverInfoMap.put(address, serverInfo);
+        }
+
+        final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>();
+        final CountDownLatch doneLatch = new CountDownLatch(2 * numHosts);
+        ProxyListener listener = new ProxyListener() {
+            @Override
+            public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
+                synchronized (results) {
+                    results.put(address, serverInfo);
+                }
+                doneLatch.countDown();
+            }
+
+            @Override
+            public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
+            }
+        };
+
+        TestHostProvider rs = new TestHostProvider();
+        ProxyClientManager clientManager = createProxyClientManager(builder, rs, 0L);
+        clientManager.registerProxyListener(listener);
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        for (int i = 0; i < numHosts; i++) {
+            rs.addHost(createSocketAddress(initialPort + i));
+        }
+        // handshake would handshake with 3 hosts again
+        clientManager.handshake();
+        doneLatch.await();
+        assertEquals("Handshake should return server info",
+                numHosts, results.size());
+        assertTrue("Handshake should get all server infos",
+                Maps.difference(serverInfoMap, results).areEqual());
+    }
+
+    @Test(timeout = 60000)
+    public void testPeriodicHandshake() throws Exception {
+        final int numHosts = 3;
+        final int numStreamsPerHost = 3;
+        final int initialPort = 5000;
+
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        Map<SocketAddress, ServerInfo> serverInfoMap =
+                new HashMap<SocketAddress, ServerInfo>();
+        Map<SocketAddress, MockServerInfoService> mockServiceMap =
+                new HashMap<SocketAddress, MockServerInfoService>();
+        final Map<SocketAddress, CountDownLatch> hostDoneLatches =
+                new HashMap<SocketAddress, CountDownLatch>();
+        for (int i = 0; i < numHosts; i++) {
+            SocketAddress address = createSocketAddress(initialPort + i);
+            ServerInfo serverInfo = new ServerInfo();
+            for (int j = 0; j < numStreamsPerHost; j++) {
+                serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j,
+                        address.toString());
+            }
+            Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
+                    createMockProxyClient(address, serverInfo);
+            builder.provideProxyClient(address, mockProxyClient.getLeft());
+            serverInfoMap.put(address, serverInfo);
+            mockServiceMap.put(address, mockProxyClient.getRight());
+            hostDoneLatches.put(address, new CountDownLatch(2));
+        }
+
+        final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>();
+        final CountDownLatch doneLatch = new CountDownLatch(numHosts);
+        ProxyListener listener = new ProxyListener() {
+            @Override
+            public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
+                synchronized (results) {
+                    results.put(address, serverInfo);
+                    CountDownLatch latch = hostDoneLatches.get(address);
+                    if (null != latch) {
+                        latch.countDown();
+                    }
+                }
+                doneLatch.countDown();
+            }
+
+            @Override
+            public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
+            }
+        };
+
+        TestHostProvider rs = new TestHostProvider();
+        ProxyClientManager clientManager = createProxyClientManager(builder, rs, 50L);
+        clientManager.setPeriodicHandshakeEnabled(false);
+        clientManager.registerProxyListener(listener);
+
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        for (int i = 0; i < numHosts; i++) {
+            SocketAddress address = createSocketAddress(initialPort + i);
+            rs.addHost(address);
+            clientManager.createClient(address);
+        }
+
+        // make sure the first 3 handshakes going through
+        doneLatch.await();
+
+        assertEquals("Handshake should return server info",
+                numHosts, results.size());
+        assertTrue("Handshake should get all server infos",
+                Maps.difference(serverInfoMap, results).areEqual());
+
+        // update server info
+        for (int i = 0; i < numHosts; i++) {
+            SocketAddress address = createSocketAddress(initialPort + i);
+            ServerInfo serverInfo = new ServerInfo();
+            for (int j = 0; j < numStreamsPerHost; j++) {
+                serverInfo.putToOwnerships(runtime.getMethodName() + "_new_stream_" + j,
+                        address.toString());
+            }
+            MockServerInfoService service = mockServiceMap.get(address);
+            serverInfoMap.put(address, serverInfo);
+            service.updateServerInfo(serverInfo);
+        }
+
+        clientManager.setPeriodicHandshakeEnabled(true);
+        for (int i = 0; i < numHosts; i++) {
+            SocketAddress address = createSocketAddress(initialPort + i);
+            CountDownLatch latch = hostDoneLatches.get(address);
+            latch.await();
+        }
+
+        assertTrue("Periodic handshake should update all server infos",
+                Maps.difference(serverInfoMap, results).areEqual());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java
new file mode 100644
index 0000000..f44cddd
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.service.DLSocketAddress;
+import com.twitter.finagle.Address;
+import com.twitter.finagle.Addresses;
+import com.twitter.finagle.ChannelWriteException;
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link ConsistentHashRoutingService}.
+ */
+public class TestConsistentHashRoutingService {
+
+    @Test(timeout = 60000)
+    public void testBlackoutHost() throws Exception {
+        TestName name = new TestName();
+        RoutingService routingService = ConsistentHashRoutingService.newBuilder()
+                .serverSet(new NameServerSet(name))
+                .resolveFromName(true)
+                .numReplicas(997)
+                .blackoutSeconds(2)
+                .build();
+
+        InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 3181);
+        Address address = Addresses.newInetAddress(inetAddress);
+        List<Address> addresses = new ArrayList<Address>(1);
+        addresses.add(address);
+        name.changeAddrs(addresses);
+
+        routingService.startService();
+
+        RoutingService.RoutingContext routingContext =
+                RoutingService.RoutingContext.of(new DefaultRegionResolver());
+
+        String streamName = "test-blackout-host";
+        assertEquals(inetAddress, routingService.getHost(streamName, routingContext));
+        routingService.removeHost(inetAddress, new ChannelWriteException(new IOException("test exception")));
+        try {
+            routingService.getHost(streamName, routingContext);
+            fail("Should fail to get host since no brokers are available");
+        } catch (NoBrokersAvailableException nbae) {
+            // expected
+        }
+
+        TimeUnit.SECONDS.sleep(3);
+        assertEquals(inetAddress, routingService.getHost(streamName, routingContext));
+
+        routingService.stopService();
+    }
+
+    @Test(timeout = 60000)
+    public void testPerformServerSetChangeOnName() throws Exception {
+        TestName name = new TestName();
+        ConsistentHashRoutingService routingService = (ConsistentHashRoutingService)
+                ConsistentHashRoutingService.newBuilder()
+                        .serverSet(new NameServerSet(name))
+                        .resolveFromName(true)
+                        .numReplicas(997)
+                        .build();
+
+        int basePort = 3180;
+        int numHosts = 4;
+        List<Address> addresses1 = Lists.newArrayListWithExpectedSize(4);
+        List<Address> addresses2 = Lists.newArrayListWithExpectedSize(4);
+        List<Address> addresses3 = Lists.newArrayListWithExpectedSize(4);
+
+        // fill up the addresses1
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            Address address = Addresses.newInetAddress(inetAddress);
+            addresses1.add(address);
+        }
+        // fill up the addresses2 - overlap with addresses1
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 2 + i);
+            Address address = Addresses.newInetAddress(inetAddress);
+            addresses2.add(address);
+        }
+        // fill up the addresses3 - not overlap with addresses2
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i);
+            Address address = Addresses.newInetAddress(inetAddress);
+            addresses3.add(address);
+        }
+
+        final List<SocketAddress> leftAddresses = Lists.newArrayList();
+        final List<SocketAddress> joinAddresses = Lists.newArrayList();
+
+        RoutingService.RoutingListener routingListener = new RoutingService.RoutingListener() {
+            @Override
+            public void onServerLeft(SocketAddress address) {
+                synchronized (leftAddresses) {
+                    leftAddresses.add(address);
+                    leftAddresses.notifyAll();
+                }
+            }
+
+            @Override
+            public void onServerJoin(SocketAddress address) {
+                synchronized (joinAddresses) {
+                    joinAddresses.add(address);
+                    joinAddresses.notifyAll();
+                }
+            }
+        };
+
+        routingService.registerListener(routingListener);
+        name.changeAddrs(addresses1);
+
+        routingService.startService();
+
+        synchronized (joinAddresses) {
+            while (joinAddresses.size() < numHosts) {
+                joinAddresses.wait();
+            }
+        }
+
+        // validate 4 nodes joined
+        synchronized (joinAddresses) {
+            assertEquals(numHosts, joinAddresses.size());
+        }
+        synchronized (leftAddresses) {
+            assertEquals(0, leftAddresses.size());
+        }
+        assertEquals(numHosts, routingService.shardId2Address.size());
+        assertEquals(numHosts, routingService.address2ShardId.size());
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+
+        // update addresses2 - 2 new hosts joined, 2 old hosts left
+        name.changeAddrs(addresses2);
+        synchronized (joinAddresses) {
+            while (joinAddresses.size() < numHosts + 2) {
+                joinAddresses.wait();
+            }
+        }
+        synchronized (leftAddresses) {
+            while (leftAddresses.size() < numHosts - 2) {
+                leftAddresses.wait();
+            }
+        }
+        assertEquals(numHosts, routingService.shardId2Address.size());
+        assertEquals(numHosts, routingService.address2ShardId.size());
+
+        // first 2 shards should leave
+        for (int i = 0; i < 2; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            assertFalse(routingService.address2ShardId.containsKey(inetAddress));
+        }
+
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 2 + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+
+        // update addresses3 - 2 new hosts joined, 2 old hosts left
+        name.changeAddrs(addresses3);
+        synchronized (joinAddresses) {
+            while (joinAddresses.size() < numHosts + 2 + numHosts) {
+                joinAddresses.wait();
+            }
+        }
+        synchronized (leftAddresses) {
+            while (leftAddresses.size() < numHosts - 2 + numHosts) {
+                leftAddresses.wait();
+            }
+        }
+        assertEquals(numHosts, routingService.shardId2Address.size());
+        assertEquals(numHosts, routingService.address2ShardId.size());
+
+        // first 6 shards should leave
+        for (int i = 0; i < 2 + numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            assertFalse(routingService.address2ShardId.containsKey(inetAddress));
+        }
+        // new 4 shards should exist
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+
+    }
+
+    private static class TestServerSetWatcher implements ServerSetWatcher {
+
+        final LinkedBlockingQueue<ImmutableSet<DLSocketAddress>> changeQueue =
+                new LinkedBlockingQueue<ImmutableSet<DLSocketAddress>>();
+        final CopyOnWriteArrayList<ServerSetMonitor> monitors =
+                new CopyOnWriteArrayList<ServerSetMonitor>();
+
+        @Override
+        public void watch(ServerSetMonitor monitor) throws MonitorException {
+            monitors.add(monitor);
+            ImmutableSet<DLSocketAddress> change;
+            while ((change = changeQueue.poll()) != null) {
+                notifyChanges(change);
+            }
+        }
+
+        void notifyChanges(ImmutableSet<DLSocketAddress> addresses) {
+            if (monitors.isEmpty()) {
+                changeQueue.add(addresses);
+            } else {
+                for (ServerSetMonitor monitor : monitors) {
+                    monitor.onChange(addresses);
+                }
+            }
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testPerformServerSetChangeOnServerSet() throws Exception {
+        TestServerSetWatcher serverSetWatcher = new TestServerSetWatcher();
+        ConsistentHashRoutingService routingService = new ConsistentHashRoutingService(
+                serverSetWatcher, 997, Integer.MAX_VALUE, NullStatsReceiver.get());
+
+        int basePort = 3180;
+        int numHosts = 4;
+        Set<DLSocketAddress> addresses1 = Sets.newConcurrentHashSet();
+        Set<DLSocketAddress> addresses2 = Sets.newConcurrentHashSet();
+        Set<DLSocketAddress> addresses3 = Sets.newConcurrentHashSet();
+
+        // fill up the addresses1
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            DLSocketAddress dsa = new DLSocketAddress(i, inetAddress);
+            addresses1.add(dsa);
+        }
+        // fill up the addresses2 - overlap with addresses1
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + i);
+            DLSocketAddress dsa = new DLSocketAddress(i + 2, inetAddress);
+            addresses2.add(dsa);
+        }
+        // fill up the addresses3 - not overlap with addresses2
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i);
+            DLSocketAddress dsa = new DLSocketAddress(i, inetAddress);
+            addresses3.add(dsa);
+        }
+
+        final List<SocketAddress> leftAddresses = Lists.newArrayList();
+        final List<SocketAddress> joinAddresses = Lists.newArrayList();
+
+        RoutingService.RoutingListener routingListener = new RoutingService.RoutingListener() {
+            @Override
+            public void onServerLeft(SocketAddress address) {
+                synchronized (leftAddresses) {
+                    leftAddresses.add(address);
+                    leftAddresses.notifyAll();
+                }
+            }
+
+            @Override
+            public void onServerJoin(SocketAddress address) {
+                synchronized (joinAddresses) {
+                    joinAddresses.add(address);
+                    joinAddresses.notifyAll();
+                }
+            }
+        };
+
+        routingService.registerListener(routingListener);
+        serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses1));
+
+        routingService.startService();
+
+        synchronized (joinAddresses) {
+            while (joinAddresses.size() < numHosts) {
+                joinAddresses.wait();
+            }
+        }
+
+        // validate 4 nodes joined
+        synchronized (joinAddresses) {
+            assertEquals(numHosts, joinAddresses.size());
+        }
+        synchronized (leftAddresses) {
+            assertEquals(0, leftAddresses.size());
+        }
+        assertEquals(numHosts, routingService.shardId2Address.size());
+        assertEquals(numHosts, routingService.address2ShardId.size());
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            assertEquals(i, shardId);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+
+        // update addresses2 - 2 new hosts joined, 2 old hosts left
+        serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses2));
+        synchronized (joinAddresses) {
+            while (joinAddresses.size() < numHosts + 2) {
+                joinAddresses.wait();
+            }
+        }
+        synchronized (leftAddresses) {
+            while (leftAddresses.size() < 2) {
+                leftAddresses.wait();
+            }
+        }
+
+        assertEquals(numHosts + 2, routingService.shardId2Address.size());
+        assertEquals(numHosts + 2, routingService.address2ShardId.size());
+        // first 2 shards should not leave
+        for (int i = 0; i < 2; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            assertEquals(i, shardId);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            assertEquals(i + 2, shardId);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+
+        // update addresses3
+        serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses3));
+        synchronized (joinAddresses) {
+            while (joinAddresses.size() < numHosts + 2 + numHosts) {
+                joinAddresses.wait();
+            }
+        }
+        synchronized (leftAddresses) {
+            while (leftAddresses.size() < 2 + numHosts) {
+                leftAddresses.wait();
+            }
+        }
+        assertEquals(numHosts + 2, routingService.shardId2Address.size());
+        assertEquals(numHosts + 2, routingService.address2ShardId.size());
+
+        // first 4 shards should leave
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            assertEquals(i, shardId);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+        // the other 2 shards should be still there
+        for (int i = 0; i < 2; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + 2 + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            assertEquals(numHosts + i, shardId);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java
new file mode 100644
index 0000000..59665b9
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.net.pool.DynamicHostSet;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Case for `inet` name resolution.
+ */
+public class TestInetNameResolution {
+
+    private static final Logger logger = LoggerFactory.getLogger(TestRoutingService.class);
+
+    @Test(timeout = 10000)
+    public void testInetNameResolution() throws Exception {
+        String nameStr = "inet!127.0.0.1:3181";
+        final CountDownLatch resolved = new CountDownLatch(1);
+        final AtomicBoolean validationFailed = new AtomicBoolean(false);
+
+        NameServerSet serverSet = new NameServerSet(nameStr);
+        serverSet.watch(new DynamicHostSet.HostChangeMonitor<ServiceInstance>() {
+            @Override
+            public void onChange(ImmutableSet<ServiceInstance> hostSet) {
+                if (hostSet.size() > 1) {
+                    logger.error("HostSet has more elements than expected {}", hostSet);
+                    validationFailed.set(true);
+                    resolved.countDown();
+                } else if (hostSet.size() == 1) {
+                    ServiceInstance serviceInstance = hostSet.iterator().next();
+                    Endpoint endpoint = serviceInstance.getAdditionalEndpoints().get("thrift");
+                    InetSocketAddress address = new InetSocketAddress(endpoint.getHost(), endpoint.getPort());
+                    if (endpoint.getPort() != 3181) {
+                        logger.error("Port does not match the expected port {}", endpoint.getPort());
+                        validationFailed.set(true);
+                    } else if (!address.getAddress().getHostAddress().equals("127.0.0.1")) {
+                        logger.error("Host address does not match the expected address {}",
+                            address.getAddress().getHostAddress());
+                        validationFailed.set(true);
+                    }
+                    resolved.countDown();
+                }
+            }
+        });
+
+        resolved.await();
+        Assert.assertEquals(false, validationFailed.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java
new file mode 100644
index 0000000..151663e
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Sets;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import com.twitter.finagle.NoBrokersAvailableException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link RegionsRoutingService}.
+ */
+public class TestRegionsRoutingService {
+
+    @Test(timeout = 60000)
+    public void testRoutingListener() throws Exception {
+        int numRoutingServices = 5;
+        RoutingService.Builder[] routingServiceBuilders = new RoutingService.Builder[numRoutingServices];
+        Set<SocketAddress> hosts = new HashSet<SocketAddress>();
+        Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>();
+        for (int i = 0; i < numRoutingServices; i++) {
+            String finagleNameStr = "inet!127.0.0.1:" + (3181 + i);
+            routingServiceBuilders[i] = RoutingUtils.buildRoutingService(finagleNameStr);
+            SocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + i);
+            hosts.add(address);
+            regionMap.put(address, "region-" + i);
+        }
+
+        final CountDownLatch doneLatch = new CountDownLatch(numRoutingServices);
+        final AtomicInteger numHostsLeft = new AtomicInteger(0);
+        final Set<SocketAddress> jointHosts = new HashSet<SocketAddress>();
+        RegionsRoutingService regionsRoutingService =
+                RegionsRoutingService.newBuilder()
+                    .routingServiceBuilders(routingServiceBuilders)
+                    .resolver(new DefaultRegionResolver(regionMap))
+                    .build();
+        regionsRoutingService.registerListener(new RoutingService.RoutingListener() {
+            @Override
+            public void onServerLeft(SocketAddress address) {
+                numHostsLeft.incrementAndGet();
+            }
+
+            @Override
+            public void onServerJoin(SocketAddress address) {
+                jointHosts.add(address);
+                doneLatch.countDown();
+            }
+        });
+
+        regionsRoutingService.startService();
+
+        doneLatch.await();
+
+        assertEquals(numRoutingServices, jointHosts.size());
+        assertEquals(0, numHostsLeft.get());
+        assertTrue(Sets.difference(hosts, jointHosts).immutableCopy().isEmpty());
+    }
+
+    @Test(timeout = 60000)
+    public void testGetHost() throws Exception {
+        int numRoutingServices = 3;
+        RoutingService.Builder[] routingServiceBuilders = new RoutingService.Builder[numRoutingServices];
+        Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>();
+        for (int i = 0; i < numRoutingServices; i++) {
+            String finagleNameStr = "inet!127.0.0.1:" + (3181 + i);
+            routingServiceBuilders[i] = RoutingUtils.buildRoutingService(finagleNameStr);
+            SocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + i);
+            regionMap.put(address, "region-" + i);
+        }
+
+        RegionsRoutingService regionsRoutingService =
+                RegionsRoutingService.newBuilder()
+                    .resolver(new DefaultRegionResolver(regionMap))
+                    .routingServiceBuilders(routingServiceBuilders)
+                    .build();
+        regionsRoutingService.startService();
+
+        RoutingService.RoutingContext routingContext =
+                RoutingService.RoutingContext.of(new DefaultRegionResolver())
+                        .addTriedHost(new InetSocketAddress("127.0.0.1", 3183), StatusCode.WRITE_EXCEPTION);
+        assertEquals(new InetSocketAddress("127.0.0.1", 3181),
+                regionsRoutingService.getHost("any", routingContext));
+
+        routingContext =
+                RoutingService.RoutingContext.of(new DefaultRegionResolver())
+                        .addTriedHost(new InetSocketAddress("127.0.0.1", 3181), StatusCode.WRITE_EXCEPTION);
+        assertEquals(new InetSocketAddress("127.0.0.1", 3182),
+                regionsRoutingService.getHost("any", routingContext));
+
+        // add 3182 to routing context as tried host
+        routingContext.addTriedHost(new InetSocketAddress("127.0.0.1", 3182), StatusCode.WRITE_EXCEPTION);
+        assertEquals(new InetSocketAddress("127.0.0.1", 3183),
+                regionsRoutingService.getHost("any", routingContext));
+
+        // add 3183 to routing context as tried host
+        routingContext.addTriedHost(new InetSocketAddress("127.0.0.1", 3183), StatusCode.WRITE_EXCEPTION);
+        try {
+            regionsRoutingService.getHost("any", routingContext);
+            fail("Should fail to get host since all regions are tried.");
+        } catch (NoBrokersAvailableException nbae) {
+            // expected
+        }
+    }
+
+}


Mime
View raw message