distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [09/31] incubator-distributedlog git commit: DL-157: resource placement for write proxy
Date Fri, 30 Dec 2016 00:07:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index d7a0ba6..1bfe352 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -28,6 +28,7 @@ import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.exceptions.StreamUnavailableException;
 import com.twitter.distributedlog.service.config.NullStreamConfigProvider;
 import com.twitter.distributedlog.service.config.ServerConfiguration;
+import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
 import com.twitter.distributedlog.service.stream.WriteOp;
 import com.twitter.distributedlog.service.stream.StreamImpl.StreamStatus;
 import com.twitter.distributedlog.service.stream.StreamImpl;
@@ -140,16 +141,17 @@ public class TestDistributedLogService extends TestDistributedLogBase
{
             converter = new IdentityStreamPartitionConverter();
         }
         return new DistributedLogServiceImpl(
-                serverConf,
-                dlConf,
-                ConfUtils.getConstDynConf(dlConf),
-                new NullStreamConfigProvider(),
-                uri,
-                converter,
-                new LocalRoutingService(),
-                NullStatsLogger.INSTANCE,
-                NullStatsLogger.INSTANCE,
-                latch);
+            serverConf,
+            dlConf,
+            ConfUtils.getConstDynConf(dlConf),
+            new NullStreamConfigProvider(),
+            uri,
+            converter,
+            new LocalRoutingService(),
+            NullStatsLogger.INSTANCE,
+            NullStatsLogger.INSTANCE,
+            latch,
+            new EqualLoadAppraiser());
     }
 
     private StreamImpl createUnstartedStream(DistributedLogServiceImpl service,
@@ -777,21 +779,21 @@ public class TestDistributedLogService extends TestDistributedLogBase
{
                 .addHost("stream-0", service.getServiceAddress().getSocketAddress())
                 .setAllowRetrySameHost(false);
 
-        // routing service doesn't know 'stream-1'
+        service.startPlacementPolicy();
+
         WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext()));
-        assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode());
+        assertEquals(StatusCode.FOUND, response.getHeader().getCode());
+        assertEquals(service.getServiceAddress().toString(),
+                response.getHeader().getLocation());
 
-        // service cache "stream-2" but not acquire
+        // service cache "stream-2"
         StreamImpl stream = (StreamImpl) service.getStreamManager().getOrCreateStream("stream-2",
false);
-        response = FutureUtils.result(service.getOwner("stream-2", new WriteContext()));
-        assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode());
-
         // create write ops to stream-2 to make service acquire the stream
         WriteOp op = createWriteOp(service, "stream-2", 0L);
         stream.submit(op);
         stream.start();
         WriteResponse wr = Await.result(op.result());
-        assertEquals("Op  should succeed",
+        assertEquals("Op should succeed",
                 StatusCode.SUCCESS, wr.getHeader().getCode());
         assertEquals("Service should acquire stream",
                 StreamStatus.INITIALIZED, stream.getStatus());
@@ -804,18 +806,6 @@ public class TestDistributedLogService extends TestDistributedLogBase
{
         assertEquals(StatusCode.FOUND, response.getHeader().getCode());
         assertEquals(service.getServiceAddress().toString(),
                 response.getHeader().getLocation());
-
-        // find the stream from the routing service
-        response = FutureUtils.result(service.getOwner("stream-0", new WriteContext()));
-        assertEquals(StatusCode.FOUND, response.getHeader().getCode());
-        assertEquals(service.getServiceAddress().toString(),
-                response.getHeader().getLocation());
-
-        // add the tried host
-        WriteContext ctx = new WriteContext();
-        ctx.addToTriedHosts(DLSocketAddress.toString(service.getServiceAddress().getSocketAddress()));
-        response = FutureUtils.result(service.getOwner("stream-0", ctx));
-        assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
new file mode 100644
index 0000000..ab4eeae
--- /dev/null
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.LinkedHashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.runtime.BoxedUnit;
+
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.ScheduledThreadPoolTimer;
+import com.twitter.util.Time;
+import com.twitter.util.Timer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestLeastLoadPlacementPolicy {
+
+  @Test
+  public void testCalculateBalances() throws Exception {
+    int numSevers = new Random().nextInt(20) + 1;
+    int numStreams = new Random().nextInt(200) + 1;
+    RoutingService mockRoutingService = mock(RoutingService.class);
+    DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+    LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
+        new EqualLoadAppraiser(), mockRoutingService, mockNamespace, null, Duration.fromSeconds(600),
new NullStatsLogger());
+    TreeSet<ServerLoad> serverLoads = Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers),
generateStreams(numStreams)));
+    long lowLoadPerServer = numStreams / numSevers;
+    long highLoadPerServer = lowLoadPerServer + 1;
+    for (ServerLoad serverLoad: serverLoads) {
+      long load = serverLoad.getLoad();
+      assertEquals(load, serverLoad.getStreamLoads().size());
+      assertTrue(String.format("Load %d is not between %d and %d", load, lowLoadPerServer,
highLoadPerServer), load == lowLoadPerServer || load == highLoadPerServer);
+    }
+  }
+
+  @Test
+  public void testRefreshAndPlaceStream() throws Exception {
+    int numSevers = new Random().nextInt(20) + 1;
+    int numStreams = new Random().nextInt(200) + 1;
+    RoutingService mockRoutingService = mock(RoutingService.class);
+    when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers));
+    DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+    try {
+      when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator());
+    } catch (IOException e) {
+      fail();
+    }
+    PlacementStateManager mockPlacementStateManager = mock(PlacementStateManager.class);
+    LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
+        new EqualLoadAppraiser(), mockRoutingService, mockNamespace, mockPlacementStateManager,
Duration.fromSeconds(600), new NullStatsLogger());
+    leastLoadPlacementPolicy.refresh();
+
+    final ArgumentCaptor<TreeSet> captor = ArgumentCaptor.forClass(TreeSet.class);
+    verify(mockPlacementStateManager).saveOwnership(captor.capture());
+    TreeSet<ServerLoad> serverLoads = (TreeSet<ServerLoad>)captor.getValue();
+    ServerLoad next = serverLoads.first();
+    String serverPlacement = Await.result(leastLoadPlacementPolicy.placeStream("newstream1"));
+    assertEquals(next.getServer(), serverPlacement);
+  }
+
+  @Test
+  public void testCalculateUnequalWeight() throws Exception {
+    int numSevers = new Random().nextInt(20) + 1;
+    int numStreams = new Random().nextInt(200) + 1;
+    /* use AtomicInteger to have a final object in answer method */
+    final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE);
+    RoutingService mockRoutingService = mock(RoutingService.class);
+    DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+    LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class);
+    when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>()
{
+      @Override
+      public Future<StreamLoad> answer(InvocationOnMock invocationOnMock) throws Throwable
{
+        int load = new Random().nextInt(100000);
+        if (load > maxLoad.get()) {
+          maxLoad.set(load);
+        }
+        return Future.value(new StreamLoad(invocationOnMock.getArguments()[0].toString(),
load));
+      }
+    });
+    LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
+        mockLoadAppraiser, mockRoutingService, mockNamespace, null, Duration.fromSeconds(600),
new NullStatsLogger());
+    TreeSet<ServerLoad> serverLoads = Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers),
generateStreams(numStreams)));
+    long highestLoadSeen = Long.MIN_VALUE;
+    long lowestLoadSeen = Long.MAX_VALUE;
+    for (ServerLoad serverLoad: serverLoads) {
+      long load = serverLoad.getLoad();
+      if (load < lowestLoadSeen) {
+        lowestLoadSeen = load;
+      }
+      if (load > highestLoadSeen) {
+        highestLoadSeen = load;
+      }
+    }
+    assertTrue(highestLoadSeen - lowestLoadSeen < maxLoad.get());
+  }
+
+  private Set<SocketAddress> generateSocketAddresses(int num) {
+    LinkedHashSet<SocketAddress> socketAddresses = new LinkedHashSet<SocketAddress>();
+    for (int i = 0; i < num; i++) {
+      socketAddresses.add(new InetSocketAddress(i));
+    }
+    return socketAddresses;
+  }
+
+  private Set<String> generateStreams(int num) {
+    LinkedHashSet<String> streams = new LinkedHashSet<String>();
+    for (int i = 0; i < num; i++) {
+      streams.add("stream_" + i);
+    }
+    return streams;
+  }
+
+  private Set<String> generateServers(int num) {
+    LinkedHashSet<String> servers = new LinkedHashSet<String>();
+    for (int i = 0; i < num; i++) {
+      servers.add("server_" + i);
+    }
+    return servers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
new file mode 100644
index 0000000..bbd7e72
--- /dev/null
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestServerLoad {
+
+  @Test
+  public void testSerializeDeserialize() throws IOException {
+    final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
+    for (int i = 0; i < 20; i++) {
+      serverLoad.addStream(new StreamLoad("stream-"+i, i));
+    }
+    assertEquals(serverLoad, ServerLoad.deserialize(serverLoad.serialize()));
+  }
+
+  @Test
+  public void testGetLoad() throws IOException {
+    final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
+    assertEquals(0, serverLoad.getLoad());
+    serverLoad.addStream(new StreamLoad("stream-"+1, 3));
+    assertEquals(3, serverLoad.getLoad());
+    serverLoad.addStream(new StreamLoad("stream-"+2, 7));
+    assertEquals(10, serverLoad.getLoad());
+    serverLoad.addStream(new StreamLoad("stream-"+3, 1));
+    assertEquals(11, serverLoad.getLoad());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
new file mode 100644
index 0000000..3a3e5c0
--- /dev/null
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestStreamLoad {
+
+  @Test
+  public void testSerializeDeserialize() throws IOException {
+    final String streamName = "aHellaRandomStreamName";
+    final int load = 1337;
+    final StreamLoad streamLoad = new StreamLoad(streamName, load);
+    assertEquals(streamLoad, StreamLoad.deserialize(streamLoad.serialize()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
new file mode 100644
index 0000000..b104952
--- /dev/null
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.distributedlog.DistributedLogConfiguration;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import static com.twitter.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
+
+public class TestZKPlacementStateManager {
+  private TestingServer zkTestServer;
+  private String zkServers;
+  private URI uri;
+  private ZKPlacementStateManager zkPlacementStateManager;
+
+  @Before
+  public void startZookeeper() throws Exception {
+    zkTestServer = new TestingServer(2181);
+    zkServers = "127.0.0.1:2181";
+    uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
+    zkPlacementStateManager = new ZKPlacementStateManager(uri, new DistributedLogConfiguration(),
NullStatsLogger.INSTANCE);
+  }
+
+  @Test
+  public void testSaveLoad() throws Exception {
+    TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+    zkPlacementStateManager.saveOwnership(ownerships);
+    SortedSet<ServerLoad> loadedOwnerships = zkPlacementStateManager.loadOwnership();
+    assertEquals(ownerships, loadedOwnerships);
+
+    ownerships.add(new ServerLoad("emptyServer"));
+    zkPlacementStateManager.saveOwnership(ownerships);
+    loadedOwnerships = zkPlacementStateManager.loadOwnership();
+    assertEquals(ownerships, loadedOwnerships);
+
+    ServerLoad sl1 = new ServerLoad("server1");
+    sl1.addStream(new StreamLoad("stream1", 3));
+    sl1.addStream(new StreamLoad("stream2", 4));
+    ServerLoad sl2 = new ServerLoad("server2");
+    sl2.addStream(new StreamLoad("stream3", 1));
+    ownerships.add(sl1);
+    ownerships.add(sl2);
+    zkPlacementStateManager.saveOwnership(ownerships);
+    loadedOwnerships = zkPlacementStateManager.loadOwnership();
+    assertEquals(ownerships, loadedOwnerships);
+
+    loadedOwnerships.remove(sl1);
+    zkPlacementStateManager.saveOwnership(ownerships);
+    loadedOwnerships = zkPlacementStateManager.loadOwnership();
+    assertEquals(ownerships, loadedOwnerships);
+  }
+
+  @Test
+  public void testWatchIndefinitely() throws Exception {
+    TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+    ownerships.add(new ServerLoad("server1"));
+    PlacementStateManager.PlacementCallback callback = mock(PlacementStateManager.PlacementCallback.class);
+    zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path
before watching
+    zkPlacementStateManager.watch(callback);
+    // cannot verify the callback here as it may call before the verify is called
+
+    zkPlacementStateManager.saveOwnership(ownerships);
+    verify(callback, timeout(1000)).callback(ownerships);
+
+    ServerLoad server2 = new ServerLoad("server2");
+    server2.addStream(new StreamLoad("hella-important-stream", 415));
+    ownerships.add(server2);
+    zkPlacementStateManager.saveOwnership(ownerships);
+    verify(callback, timeout(1000)).callback(ownerships);
+
+    server2.removeStream("server1");
+    zkPlacementStateManager.saveOwnership(ownerships);
+    verify(callback, timeout(1000)).callback(ownerships);
+  }
+
+  @Test
+  public void testZkFormatting() throws Exception {
+    final String server = "smf1-eci-41-sr1.prod.twitter.com/10.70.186.139:31351";
+    final String zkFormattedServer = "smf1-eci-41-sr1.prod.twitter.com--10.70.186.139:31351";
+    URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
+    ZKPlacementStateManager zkPlacementStateManager = new ZKPlacementStateManager(uri, new
DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
+    assertEquals(zkFormattedServer, zkPlacementStateManager.serverToZkFormat(server));
+    assertEquals(server, zkPlacementStateManager.zkFormatToServer(zkFormattedServer));
+  }
+
+  @After
+  public void stopZookeeper() throws IOException {
+    zkTestServer.stop();
+  }
+}


Mime
View raw message