distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [31/31] incubator-distributedlog git commit: Address a few flaky test cases
Date Fri, 30 Dec 2016 00:07:45 GMT
Address a few flaky test cases


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/f607a48f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/f607a48f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/f607a48f

Branch: refs/heads/master
Commit: f607a48ff10499e3bb764a44cab8619c355d3bf4
Parents: 63d6bde
Author: Sijie Guo <sijieg@twitter.com>
Authored: Thu Dec 29 15:16:20 2016 -0800
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Thu Dec 29 16:06:20 2016 -0800

----------------------------------------------------------------------
 .../service/stream/StreamImpl.java              | 48 +++++++++----------
 .../service/TestDistributedLogService.java      | 22 +++++++--
 .../placement/TestLeastLoadPlacementPolicy.java | 11 +++--
 .../service/placement/TestServerLoad.java       |  4 +-
 .../service/placement/TestStreamLoad.java       |  2 +-
 .../placement/TestZKPlacementStateManager.java  | 50 ++++++++++++--------
 6 files changed, 81 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f607a48f/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 9f049c8..36904fd 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -82,7 +82,7 @@ public class StreamImpl implements Stream {
      * any error, the stream should be put in error state. If a stream is in error state,
      * it should be removed and not reused anymore.
      */
-    public static enum StreamStatus {
+    public enum StreamStatus {
         UNINITIALIZED(-1),
         INITIALIZING(0),
         INITIALIZED(1),
@@ -101,7 +101,7 @@ public class StreamImpl implements Stream {
             return code;
         }
 
-        static boolean isUnavailable(StreamStatus status) {
+        public static boolean isUnavailable(StreamStatus status) {
             return StreamStatus.ERROR == status || StreamStatus.CLOSING == status || StreamStatus.CLOSED
== status;
         }
     }
@@ -763,23 +763,7 @@ public class StreamImpl implements Stream {
         // we will fail the requests that are coming in between closing and closed only
         // after the async writer is closed. so we could clear up the lock before redirect
         // them.
-        close(abort);
-        unregisterGauge();
-        if (uncache) {
-            final long probationTimeoutMs;
-            if (null != owner) {
-                probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds() / 3;
-            } else {
-                probationTimeoutMs = 0L;
-            }
-            closePromise.onSuccess(new AbstractFunction1<Void, BoxedUnit>() {
-                @Override
-                public BoxedUnit apply(Void result) {
-                    streamManager.scheduleRemoval(StreamImpl.this, probationTimeoutMs);
-                    return BoxedUnit.UNIT;
-                }
-            });
-        }
+        close(abort, uncache);
         return closePromise;
     }
 
@@ -799,11 +783,29 @@ public class StreamImpl implements Stream {
     }
 
     /**
+     * Post action executed after closing.
+     */
+    private void postClose(boolean uncache) {
+        closeManagerAndErrorOutPendingRequests();
+        unregisterGauge();
+        if (uncache) {
+            if (null != owner) {
+                long probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds()
/ 3;
+                streamManager.scheduleRemoval(this, probationTimeoutMs);
+            } else {
+                streamManager.notifyRemoved(this);
+                logger.info("Removed cached stream {}.", getStreamName());
+            }
+        }
+        FutureUtils.setValue(closePromise, null);
+    }
+
+    /**
      * Shouldn't call close directly. The callers should call #requestClose instead
      *
      * @param shouldAbort shall we abort the stream instead of closing
      */
-    private Future<Void> close(boolean shouldAbort) {
+    private Future<Void> close(boolean shouldAbort, final boolean uncache) {
         boolean abort;
         closeLock.writeLock().lock();
         try {
@@ -841,16 +843,14 @@ public class StreamImpl implements Stream {
                 new FutureEventListener<Void>() {
                     @Override
                     public void onSuccess(Void value) {
-                        closeManagerAndErrorOutPendingRequests();
-                        FutureUtils.setValue(closePromise, null);
+                        postClose(uncache);
                     }
                     @Override
                     public void onFailure(Throwable cause) {
                         if (cause instanceof TimeoutException) {
                             writerCloseTimeoutCounter.inc();
                         }
-                        closeManagerAndErrorOutPendingRequests();
-                        FutureUtils.setValue(closePromise, null);
+                        postClose(uncache);
                     }
                 }, scheduler, name));
         return closePromise;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f607a48f/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index 1bfe352..f97399d 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -211,8 +211,9 @@ public class TestDistributedLogService extends TestDistributedLogBase
{
         WriteResponse wr1 = Await.result(op1.result());
         assertEquals("Op 1 should fail",
                 StatusCode.FOUND, wr1.getHeader().getCode());
-        assertEquals("Service 1 should be in ERROR state",
-                StreamStatus.ERROR, s1.getStatus());
+        // the stream will be set to ERROR and then be closed.
+        assertTrue("Service 1 should be in unavailable state",
+                StreamStatus.isUnavailable(s1.getStatus()));
         assertNotNull(s1.getManager());
         assertNull(s1.getWriter());
         assertNotNull(s1.getLastException());
@@ -527,15 +528,26 @@ public class TestDistributedLogService extends TestDistributedLogBase
{
     public void testStreamOpNoChecksum() throws Exception {
         DistributedLogServiceImpl localService = createConfiguredLocalService();
         WriteContext ctx = new WriteContext();
-        Future<WriteResponse> result = localService.release("test", ctx);
+        HeartbeatOptions option = new HeartbeatOptions();
+        option.setSendHeartBeatToReader(true);
+
+        // hearbeat to acquire the stream and then release the stream
+        Future<WriteResponse> result = localService.heartbeatWithOptions("test", ctx,
option);
         WriteResponse resp = Await.result(result);
         assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
-        result = localService.delete("test", ctx);
+        result = localService.release("test", ctx);
+        resp = Await.result(result);
+        assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+
+        // heartbeat to acquire the stream and then delete the stream
+        result = localService.heartbeatWithOptions("test", ctx, option);
         resp = Await.result(result);
         assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
-        result = localService.heartbeat("test", ctx);
+        result = localService.delete("test", ctx);
         resp = Await.result(result);
         assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+
+        // shutdown the local service
         localService.shutdown();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f607a48f/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
index ab4eeae..bde33c6 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
@@ -55,7 +55,7 @@ import static org.mockito.Mockito.when;
 
 public class TestLeastLoadPlacementPolicy {
 
-  @Test
+  @Test(timeout = 10000)
   public void testCalculateBalances() throws Exception {
     int numSevers = new Random().nextInt(20) + 1;
     int numStreams = new Random().nextInt(200) + 1;
@@ -73,7 +73,7 @@ public class TestLeastLoadPlacementPolicy {
     }
   }
 
-  @Test
+  @Test(timeout = 10000)
   public void testRefreshAndPlaceStream() throws Exception {
     int numSevers = new Random().nextInt(20) + 1;
     int numStreams = new Random().nextInt(200) + 1;
@@ -98,7 +98,7 @@ public class TestLeastLoadPlacementPolicy {
     assertEquals(next.getServer(), serverPlacement);
   }
 
-  @Test
+  @Test(timeout = 10000)
   public void testCalculateUnequalWeight() throws Exception {
     int numSevers = new Random().nextInt(20) + 1;
     int numStreams = new Random().nextInt(200) + 1;
@@ -131,7 +131,10 @@ public class TestLeastLoadPlacementPolicy {
         highestLoadSeen = load;
       }
     }
-    assertTrue(highestLoadSeen - lowestLoadSeen < maxLoad.get());
+    assertTrue("Unexpected placement for " + numStreams + " streams to "
+            + numSevers + " servers : highest load = " + highestLoadSeen
+            + ", lowest load = " + lowestLoadSeen + ", max stream load = " + maxLoad.get(),
+            highestLoadSeen - lowestLoadSeen < maxLoad.get());
   }
 
   private Set<SocketAddress> generateSocketAddresses(int num) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f607a48f/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
index bbd7e72..d844f78 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.assertEquals;
 
 public class TestServerLoad {
 
-  @Test
+  @Test(timeout = 60000)
   public void testSerializeDeserialize() throws IOException {
     final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
     for (int i = 0; i < 20; i++) {
@@ -34,7 +34,7 @@ public class TestServerLoad {
     assertEquals(serverLoad, ServerLoad.deserialize(serverLoad.serialize()));
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testGetLoad() throws IOException {
     final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
     assertEquals(0, serverLoad.getLoad());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f607a48f/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
index 3a3e5c0..e5091f5 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.assertEquals;
 
 public class TestStreamLoad {
 
-  @Test
+  @Test(timeout = 10000)
   public void testSerializeDeserialize() throws IOException {
     final String streamName = "aHellaRandomStreamName";
     final int load = 1337;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f607a48f/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
index b104952..c02492d 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.curator.test.TestingServer;
@@ -30,15 +31,9 @@ import org.junit.Test;
 
 import com.twitter.distributedlog.DistributedLogConfiguration;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
 import static com.twitter.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 public class TestZKPlacementStateManager {
   private TestingServer zkTestServer;
@@ -54,7 +49,7 @@ public class TestZKPlacementStateManager {
     zkPlacementStateManager = new ZKPlacementStateManager(uri, new DistributedLogConfiguration(),
NullStatsLogger.INSTANCE);
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testSaveLoad() throws Exception {
     TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
     zkPlacementStateManager.saveOwnership(ownerships);
@@ -83,33 +78,48 @@ public class TestZKPlacementStateManager {
     assertEquals(ownerships, loadedOwnerships);
   }
 
-  @Test
+  private TreeSet<ServerLoad> waitForServerLoadsNotificationAsc(
+          LinkedBlockingQueue<TreeSet<ServerLoad>> notificationQueue,
+          int expectedNumServerLoads) throws InterruptedException {
+    TreeSet<ServerLoad> notification = notificationQueue.take();
+    assertNotNull(notification);
+    while (notification.size() < expectedNumServerLoads) {
+      notification = notificationQueue.take();
+    }
+    assertEquals(expectedNumServerLoads, notification.size());
+    return notification;
+  }
+
+  @Test(timeout = 60000)
   public void testWatchIndefinitely() throws Exception {
     TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
     ownerships.add(new ServerLoad("server1"));
-    PlacementStateManager.PlacementCallback callback = mock(PlacementStateManager.PlacementCallback.class);
+    final LinkedBlockingQueue<TreeSet<ServerLoad>> serverLoadNotifications =
+            new LinkedBlockingQueue<TreeSet<ServerLoad>>();
+    PlacementStateManager.PlacementCallback callback = new PlacementStateManager.PlacementCallback()
{
+      @Override
+      public void callback(TreeSet<ServerLoad> serverLoads) {
+        serverLoadNotifications.add(serverLoads);
+      }
+    };
     zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path
before watching
     zkPlacementStateManager.watch(callback);
     // cannot verify the callback here as it may call before the verify is called
 
     zkPlacementStateManager.saveOwnership(ownerships);
-    verify(callback, timeout(1000)).callback(ownerships);
+    assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 1));
 
     ServerLoad server2 = new ServerLoad("server2");
     server2.addStream(new StreamLoad("hella-important-stream", 415));
     ownerships.add(server2);
     zkPlacementStateManager.saveOwnership(ownerships);
-    verify(callback, timeout(1000)).callback(ownerships);
-
-    server2.removeStream("server1");
-    zkPlacementStateManager.saveOwnership(ownerships);
-    verify(callback, timeout(1000)).callback(ownerships);
+    assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 2));
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testZkFormatting() throws Exception {
-    final String server = "smf1-eci-41-sr1.prod.twitter.com/10.70.186.139:31351";
-    final String zkFormattedServer = "smf1-eci-41-sr1.prod.twitter.com--10.70.186.139:31351";
+    final String server = "host/10.0.0.0:31351";
+    final String zkFormattedServer = "host--10.0.0.0:31351";
     URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
     ZKPlacementStateManager zkPlacementStateManager = new ZKPlacementStateManager(uri, new
DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
     assertEquals(zkFormattedServer, zkPlacementStateManager.serverToZkFormat(server));


Mime
View raw message