ratis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject incubator-ratis git commit: RATIS-438. RaftBasicTests.testWithLoad may fail.
Date Sat, 01 Dec 2018 20:50:19 GMT
Repository: incubator-ratis
Updated Branches:
  refs/heads/master 959d493c0 -> 609773e03


RATIS-438. RaftBasicTests.testWithLoad may fail.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/609773e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/609773e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/609773e0

Branch: refs/heads/master
Commit: 609773e03c96f733829a93cc5bbcc3febd25408d
Parents: 959d493
Author: Tsz Wo Nicholas Sze <szetszwo@apache.org>
Authored: Sat Dec 1 12:50:04 2018 -0800
Committer: Tsz Wo Nicholas Sze <szetszwo@apache.org>
Committed: Sat Dec 1 12:50:04 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/util/JavaUtils.java   |  19 +-
 .../java/org/apache/ratis/RaftAsyncTests.java   | 186 +++++++++----------
 .../java/org/apache/ratis/RaftBasicTests.java   |  92 +++++----
 .../java/org/apache/ratis/RaftTestUtil.java     |  10 +-
 .../SimpleStateMachine4Testing.java             |   6 +-
 .../ratis/grpc/TestRaftAsyncWithGrpc.java       |   2 +-
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java |  42 ++---
 7 files changed, 187 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index f3b0a0d..769e12f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -72,6 +72,21 @@ public interface JavaUtils {
     return trace[3];
   }
 
+  static <T extends Throwable> void runAsUnchecked(CheckedRunnable<T> runnable)
{
+    runAsUnchecked(runnable::run, RuntimeException::new);
+  }
+
+  static <THROWABLE extends Throwable> void runAsUnchecked(
+      CheckedRunnable<THROWABLE> runnable, Function<THROWABLE, ? extends RuntimeException>
converter) {
+    try {
+      runnable.run();
+    } catch(RuntimeException | Error e) {
+      throw e;
+    } catch(Throwable t) {
+      throw converter.apply(cast(t));
+    }
+  }
+
   /**
    * Invoke {@link Callable#call()} and, if there any,
    * wrap the checked exception by {@link RuntimeException}.
@@ -88,9 +103,7 @@ public interface JavaUtils {
     } catch(RuntimeException | Error e) {
       throw e;
     } catch(Throwable t) {
-      @SuppressWarnings("unchecked")
-      final THROWABLE casted = (THROWABLE)t;
-      throw converter.apply(casted);
+      throw converter.apply(cast(t));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index e459e7c..0719976 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -18,26 +18,31 @@
 package org.apache.ratis;
 
 import org.apache.log4j.Level;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RaftRetryFailureException;
+import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -45,22 +50,23 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 
 public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends BaseTest
     implements MiniRaftCluster.Factory.Get<CLUSTER> {
-  static {
+  {
     LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
 
   public static final int NUM_SERVERS = 3;
 
-  @Before
-  public void setup() {
+  {
     getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
         SimpleStateMachine4Testing.class, StateMachine.class);
   }
@@ -91,42 +97,43 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster>
extends Ba
 
   @Test
   public void testRequestAsyncWithRetryPolicy() throws Exception {
-    LOG.info("Running testWatchRequestsWithRetryPolicy");
-    try(final CLUSTER cluster = newCluster(NUM_SERVERS)) {
-     int maxRetries = 3;
-      final RetryPolicy retryPolicy = RetryPolicies
-          .retryUpToMaximumCountWithFixedSleep(maxRetries, TimeDuration.valueOf(1, TimeUnit.SECONDS));
-      cluster.start();
-      final RaftClient writeClient =
-          cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId(), retryPolicy);
+    runWithNewCluster(NUM_SERVERS, this::runTestRequestAsyncWithRetryPolicy);
+  }
+
+  void runTestRequestAsyncWithRetryPolicy(CLUSTER cluster) throws Exception {
+    final RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+        3, TimeDuration.valueOf(1, TimeUnit.SECONDS));
+    final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+
+    try(final RaftClient writeClient = cluster.createClient(leader.getId(), retryPolicy))
{
       // blockStartTransaction of the leader so that no transaction can be committed MAJORITY
-      final RaftServerImpl leader = cluster.getLeader();
       LOG.info("block leader {}", leader.getId());
       SimpleStateMachine4Testing.get(leader).blockStartTransaction();
-      RaftClientReply reply =
-          writeClient.sendAsync(RaftTestUtil.SimpleMessage.create(1)[0]).get();
+      final SimpleMessage[] messages = SimpleMessage.create(2);
+      final RaftClientReply reply = writeClient.sendAsync(messages[0]).get();
       RaftRetryFailureException rfe = reply.getRetryFailureException();
-      Assert.assertTrue(rfe != null);
+      Assert.assertNotNull(rfe);
       Assert.assertTrue(rfe.getMessage().contains(retryPolicy.toString()));
+
       // unblock leader so that the next transaction can be committed.
       SimpleStateMachine4Testing.get(leader).unblockStartTransaction();
       // make sure the the next request succeeds. This will ensure the first
       // request completed
-      writeClient.sendAsync(RaftTestUtil.SimpleMessage.create(1)[0]).get();
-      }
+      writeClient.sendAsync(messages[1]).get();
     }
+  }
 
   @Test
   public void testAsyncRequestSemaphore() throws Exception {
-    LOG.info("Running testAsyncRequestSemaphore");
-    final CLUSTER cluster = newCluster(NUM_SERVERS);
-    Assert.assertNull(cluster.getLeader());
-    cluster.start();
+    runWithNewCluster(NUM_SERVERS, this::runTestAsyncRequestSemaphore);
+  }
+
+  void runTestAsyncRequestSemaphore(CLUSTER cluster) throws Exception {
     waitForLeader(cluster);
 
     int numMessages = RaftClientConfigKeys.Async.maxOutstandingRequests(getProperties());
     CompletableFuture[] futures = new CompletableFuture[numMessages + 1];
-    final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMessages);
+    final SimpleMessage[] messages = SimpleMessage.create(numMessages);
     final RaftClient client = cluster.createClient();
     //Set blockTransaction flag so that transaction blocks
     cluster.getServers().stream()
@@ -141,11 +148,11 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster>
extends Ba
       futures[i] = client.sendAsync(messages[i]);
       blockedRequestsCount.decrementAndGet();
     }
-    Assert.assertTrue(blockedRequestsCount.get() == 0);
+    Assert.assertEquals(0, blockedRequestsCount.get());
 
     futures[numMessages] = CompletableFuture.supplyAsync(() -> {
       blockedRequestsCount.incrementAndGet();
-      client.sendAsync(new RaftTestUtil.SimpleMessage("n1"));
+      client.sendAsync(new SimpleMessage("n1"));
       blockedRequestsCount.decrementAndGet();
       return null;
     });
@@ -154,7 +161,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster>
extends Ba
     while (blockedRequestsCount.get() != 1) {
       Thread.sleep(1000);
     }
-    Assert.assertTrue(blockedRequestsCount.get() == 1);
+    Assert.assertEquals(1, blockedRequestsCount.get());
     //Since all semaphore permits are acquired the last message sent is in queue
     RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1);
 
@@ -167,19 +174,12 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster>
extends Ba
     for(int i=0; i<=numMessages; i++){
       futures[i].join();
     }
-    Assert.assertTrue(blockedRequestsCount.get() == 0);
-    cluster.shutdown();
+    Assert.assertEquals(0, blockedRequestsCount.get());
   }
 
   void runTestBasicAppendEntriesAsync(boolean killLeader) throws Exception {
-    final CLUSTER cluster = newCluster(killLeader? 5: 3);
-    try {
-      cluster.start();
-      waitForLeader(cluster);
-      RaftBasicTests.runTestBasicAppendEntries(true, killLeader, 100, cluster, LOG);
-    } finally {
-      cluster.shutdown();
-    }
+    runWithNewCluster(killLeader? 5: 3,
+        cluster -> RaftBasicTests.runTestBasicAppendEntries(true, killLeader, 100, cluster,
LOG));
   }
 
   @Test
@@ -194,21 +194,18 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster>
extends Ba
 
   @Test
   public void testWithLoadAsync() throws Exception {
-    LOG.info("Running testWithLoadAsync");
-    final CLUSTER cluster = newCluster(NUM_SERVERS);
-    cluster.start();
-    waitForLeader(cluster);
-    RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG);
-    cluster.shutdown();
+    runWithNewCluster(NUM_SERVERS,
+        cluster -> RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG));
   }
 
   @Test
   public void testStaleReadAsync() throws Exception {
-    final int numMesssages = 10;
-    final CLUSTER cluster = newCluster(NUM_SERVERS);
+    runWithNewCluster(NUM_SERVERS, this::runTestStaleReadAsync);
+  }
 
+  void runTestStaleReadAsync(CLUSTER cluster) throws Exception {
+    final int numMesssages = 10;
     try (RaftClient client = cluster.createClient()) {
-      cluster.start();
       RaftTestUtil.waitForLeader(cluster);
 
       // submit some messages
@@ -216,17 +213,19 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster>
extends Ba
       for (int i = 0; i < numMesssages; i++) {
         final String s = "" + i;
         LOG.info("sendAsync " + s);
-        futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage(s)));
+        futures.add(client.sendAsync(new SimpleMessage(s)));
       }
       Assert.assertEquals(numMesssages, futures.size());
-      RaftClientReply lastWriteReply = null;
+      final List<RaftClientReply> replies = new ArrayList<>();
       for (CompletableFuture<RaftClientReply> f : futures) {
-        lastWriteReply = f.join();
-        Assert.assertTrue(lastWriteReply.isSuccess());
+        final RaftClientReply r = f.join();
+        Assert.assertTrue(r.isSuccess());
+        replies.add(r);
       }
       futures.clear();
 
       // Use a follower with the max commit index
+      final RaftClientReply lastWriteReply = replies.get(replies.size() - 1);
       final RaftPeerId leader = lastWriteReply.getServerId();
       LOG.info("leader = " + leader);
       final Collection<CommitInfoProto> commitInfos = lastWriteReply.getCommitInfos();
@@ -235,70 +234,72 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster>
extends Ba
           .filter(info -> !RaftPeerId.valueOf(info.getServer().getId()).equals(leader))
           .max(Comparator.comparing(CommitInfoProto::getCommitIndex)).get();
       final RaftPeerId follower = RaftPeerId.valueOf(followerCommitInfo.getServer().getId());
-      LOG.info("max follower = " + follower);
+      final long followerCommitIndex = followerCommitInfo.getCommitIndex();
+      LOG.info("max follower = {}, commitIndex = {}", follower, followerCommitIndex);
 
       // test a failure case
       testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index",
           () -> client.sendStaleReadAsync(
-              new RaftTestUtil.SimpleMessage("" + Long.MAX_VALUE),
+              new SimpleMessage("" + Long.MAX_VALUE),
               followerCommitInfo.getCommitIndex(), follower),
           StateMachineException.class, IndexOutOfBoundsException.class);
 
       // test sendStaleReadAsync
       for (int i = 0; i < numMesssages; i++) {
-        final int query = i;
-        LOG.info("sendStaleReadAsync, query=" + query);
-        final Message message = new RaftTestUtil.SimpleMessage("" + query);
+        final RaftClientReply reply = replies.get(i);
+        final String query = "" + i;
+        LOG.info("query=" + query + ", reply=" + reply);
+        final Message message = new SimpleMessage(query);
         final CompletableFuture<RaftClientReply> readFuture = client.sendReadOnlyAsync(message);
-        final CompletableFuture<RaftClientReply> staleReadFuture = client.sendStaleReadAsync(
-            message, followerCommitInfo.getCommitIndex(), follower);
-
-        futures.add(readFuture.thenApply(r -> getMessageContent(r))
-            .thenCombine(staleReadFuture.thenApply(r -> getMessageContent(r)), (expected,
computed) -> {
-              try {
-                LOG.info("query " + query + " returns "
-                    + LogEntryProto.parseFrom(expected).getStateMachineLogEntry().getLogData().toStringUtf8());
-              } catch (InvalidProtocolBufferException e) {
-                throw new CompletionException(e);
-              }
-
-              Assert.assertEquals("log entry mismatch for query=" + query, expected, computed);
-              return null;
-            })
-        );
+
+        futures.add(readFuture.thenCompose(r -> {
+          if (reply.getLogIndex() <= followerCommitIndex) {
+            LOG.info("sendStaleReadAsync, query=" + query);
+            return client.sendStaleReadAsync(message, followerCommitIndex, follower);
+          } else {
+            return CompletableFuture.completedFuture(null);
+          }
+        }).thenApply(staleReadReply -> {
+          if (staleReadReply == null) {
+            return null;
+          }
+
+          final ByteString expected = readFuture.join().getMessage().getContent();
+          final ByteString computed = staleReadReply.getMessage().getContent();
+          try {
+            LOG.info("query " + query + " returns "
+                + LogEntryProto.parseFrom(expected).getStateMachineLogEntry().getLogData().toStringUtf8());
+          } catch (InvalidProtocolBufferException e) {
+            throw new CompletionException(e);
+          }
+
+          Assert.assertEquals("log entry mismatch for query=" + query, expected, computed);
+          return null;
+        }));
       }
       JavaUtils.allOf(futures).join();
-    } finally {
-      cluster.shutdown();
     }
   }
 
-  static ByteString getMessageContent(RaftClientReply reply) {
-    Assert.assertTrue(reply.isSuccess());
-    return reply.getMessage().getContent();
-  }
-
   @Test
   public void testRequestTimeout() throws Exception {
     final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
     RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5,
TimeUnit.SECONDS));
-    final CLUSTER cluster = newCluster(NUM_SERVERS);
-    cluster.start();
-    RaftBasicTests.testRequestTimeout(true, cluster, LOG);
-    cluster.shutdown();
+    runWithNewCluster(NUM_SERVERS, cluster -> RaftBasicTests.testRequestTimeout(true,
cluster, LOG));
 
     //reset for the other tests
     RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime);
   }
 
   @Test
-  public void testAppendEntriesTimeout()
-      throws IOException, InterruptedException, ExecutionException {
+  public void testAppendEntriesTimeout() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestAppendEntriesTimeout);
+  }
+
+  void runTestAppendEntriesTimeout(CLUSTER cluster) throws Exception {
     LOG.info("Running testAppendEntriesTimeout");
     final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
     RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(20,
TimeUnit.SECONDS));
-    final CLUSTER cluster = newCluster(NUM_SERVERS);
-    cluster.start();
     waitForLeader(cluster);
     long time = System.currentTimeMillis();
     long waitTime = 5000;
@@ -309,7 +310,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster>
extends Ba
           .map(SimpleStateMachine4Testing::get)
           .forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
 
-      CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new RaftTestUtil.SimpleMessage("abc"));
+      CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new SimpleMessage("abc"));
       Thread.sleep(waitTime);
       // replyFuture should not be completed until append request is unblocked.
       Assert.assertTrue(!replyFuture.isDone());
@@ -322,7 +323,6 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster>
extends Ba
       replyFuture.get();
       Assert.assertTrue(System.currentTimeMillis() - time > waitTime);
     }
-    cluster.shutdown();
 
     //reset for the other tests
     RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index f37fc21..90cc627 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -35,7 +35,6 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
-import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.Test;
@@ -54,10 +53,7 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.ratis.RaftTestUtil.logEntriesContains;
-import static org.apache.ratis.RaftTestUtil.sendMessageInNewThread;
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
-import static org.junit.Assert.assertTrue;
 
 public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
     extends BaseTest
@@ -67,38 +63,39 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
     LogUtils.setLogLevel(RaftServerTestUtil.getStateMachineUpdaterLog(), Level.DEBUG);
     LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
 
-    RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration
-        .valueOf(5, TimeUnit.SECONDS));
+    RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5,
TimeUnit.SECONDS));
   }
 
   public static final int NUM_SERVERS = 5;
 
   @Test
   public void testBasicAppendEntries() throws Exception {
-    try(CLUSTER cluster = newCluster(NUM_SERVERS)) {
-      cluster.start();
-      runTestBasicAppendEntries(false, false, 10, cluster, LOG);
-    }
+    runWithNewCluster(NUM_SERVERS, cluster ->
+        runTestBasicAppendEntries(false, false, 10, cluster, LOG));
   }
 
   @Test
   public void testBasicAppendEntriesKillLeader() throws Exception {
-    try(CLUSTER cluster = newCluster(NUM_SERVERS)) {
-      cluster.start();
-      runTestBasicAppendEntries(false, true, 10, cluster, LOG);
-    }
+    runWithNewCluster(NUM_SERVERS, cluster ->
+        runTestBasicAppendEntries(false, true, 10, cluster, LOG));
   }
 
-  static void killAndRestartServer(RaftPeerId id, long killSleepMs, long restartSleepMs,
MiniRaftCluster cluster, Logger LOG) {
-    try {
-      Thread.sleep(killSleepMs);
-      cluster.killServer(id);
-      Thread.sleep(restartSleepMs);
-      LOG.info("restart server: " + id);
-      cluster.restartServer(id, false);
-    } catch (Exception e) {
-      ExitUtils.terminate(-1, "Failed to kill/restart server: " + id, e, LOG);
-    }
+  static CompletableFuture<Void> killAndRestartServer(
+      RaftPeerId id, long killSleepMs, long restartSleepMs, MiniRaftCluster cluster, Logger
LOG) {
+    final CompletableFuture<Void> future = new CompletableFuture<>();
+    new Thread(() -> {
+      try {
+        Thread.sleep(killSleepMs);
+        cluster.killServer(id);
+        Thread.sleep(restartSleepMs);
+        LOG.info("restart server: " + id);
+        cluster.restartServer(id, false);
+        future.complete(null);
+      } catch (Exception e) {
+        ExitUtils.terminate(-1, "Failed to kill/restart server: " + id, e, LOG);
+      }
+    }).start();
+    return future;
   }
 
   static void runTestBasicAppendEntries(
@@ -112,10 +109,14 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
     RaftServerImpl leader = waitForLeader(cluster);
     final long term = leader.getState().getCurrentTerm();
 
-    new Thread(() -> killAndRestartServer(cluster.getFollowers().get(0).getId(), 0, 1000,
cluster, LOG)).start();
+    final CompletableFuture<Void> killAndRestartFollower = killAndRestartServer(
+        cluster.getFollowers().get(0).getId(), 0, 1000, cluster, LOG);
+    final CompletableFuture<Void> killAndRestartLeader;
     if (killLeader) {
       LOG.info("killAndRestart leader " + leader.getId());
-      new Thread(() -> killAndRestartServer(leader.getId(), 2000, 4000, cluster, LOG)).start();
+      killAndRestartLeader = killAndRestartServer(leader.getId(), 2000, 4000, cluster, LOG);
+    } else {
+      killAndRestartLeader = CompletableFuture.completedFuture(null);
     }
 
     LOG.info(cluster.printServers());
@@ -138,7 +139,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
           });
         } else {
           final RaftClientReply reply = client.send(message);
-          Preconditions.assertTrue(reply.isSuccess());
+          Assert.assertTrue(reply.isSuccess());
         }
       }
       if (async) {
@@ -148,6 +149,8 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
     }
     Thread.sleep(cluster.getTimeoutMax().toInt(TimeUnit.MILLISECONDS) + 100);
     LOG.info(cluster.printAllLogs());
+    killAndRestartFollower.join();
+    killAndRestartLeader.join();
 
     for(RaftServerProxy server : cluster.getServers()) {
       final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, cluster.getGroupId());
@@ -161,9 +164,10 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
 
   @Test
   public void testOldLeaderCommit() throws Exception {
-    LOG.info("Running testOldLeaderCommit");
-    final CLUSTER cluster = newCluster(NUM_SERVERS);
-    cluster.start();
+    runWithNewCluster(NUM_SERVERS, this::runTestOldLeaderCommit);
+  }
+
+  void runTestOldLeaderCommit(CLUSTER cluster) throws Exception {
     final RaftServerImpl leader = waitForLeader(cluster);
     final RaftPeerId leaderId = leader.getId();
     final long term = leader.getState().getCurrentTerm();
@@ -180,7 +184,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
 
     Thread.sleep(cluster.getMaxTimeout() + 100);
     RaftLog followerLog = followerToSendLog.getState().getLog();
-    assertTrue(logEntriesContains(followerLog, messages));
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, messages));
 
     LOG.info(String.format("killing old leader: %s", leaderId.toString()));
     cluster.killServer(leaderId);
@@ -198,15 +202,14 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
 
     cluster.getServerAliveStream().map(s -> s.getState().getLog())
         .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages));
-    LOG.info("terminating testOldLeaderCommit test");
-    cluster.shutdown();
   }
 
   @Test
   public void testOldLeaderNotCommit() throws Exception {
-    LOG.info("Running testOldLeaderNotCommit");
-    final CLUSTER cluster = newCluster(NUM_SERVERS);
-    cluster.start();
+    runWithNewCluster(NUM_SERVERS, this::runTestOldLeaderNotCommit);
+  }
+
+  void runTestOldLeaderNotCommit(CLUSTER cluster) throws Exception {
     final RaftPeerId leaderId = waitForLeader(cluster).getId();
 
     List<RaftServerImpl> followers = cluster.getFollowers();
@@ -217,10 +220,10 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
     }
 
     SimpleMessage[] messages = SimpleMessage.create(1);
-    sendMessageInNewThread(cluster, messages);
+    RaftTestUtil.sendMessageInNewThread(cluster, messages);
 
     Thread.sleep(cluster.getMaxTimeout() + 100);
-    logEntriesContains(followerToCommit.getState().getLog(), messages);
+    RaftTestUtil.logEntriesContains(followerToCommit.getState().getLog(), messages);
 
     cluster.killServer(leaderId);
     cluster.killServer(followerToCommit.getId());
@@ -236,7 +239,6 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
     cluster.getServerAliveStream()
             .map(s -> s.getState().getLog())
             .forEach(log -> RaftTestUtil.checkLogEntries(log, messages, predicate));
-    cluster.shutdown();
   }
 
   static class Client4TestWithLoad extends Thread {
@@ -318,13 +320,10 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
 
   @Test
   public void testWithLoad() throws Exception {
-    try(CLUSTER cluster = newCluster(NUM_SERVERS)) {
-      cluster.start();
-      testWithLoad(10, 500, false, cluster, LOG);
-    }
+    runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG));
   }
 
-  public static void testWithLoad(final int numClients, final int numMessages,
+  static void testWithLoad(final int numClients, final int numMessages,
       boolean useAsync, MiniRaftCluster cluster, Logger LOG) throws Exception {
     LOG.info("Running testWithLoad: numClients=" + numClients
         + ", numMessages=" + numMessages + ", async=" + useAsync);
@@ -371,12 +370,12 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
 
     int count = 0;
     for(;; ) {
-      if (clients.stream().filter(Client4TestWithLoad::isRunning).count() == 0) {
+      if (clients.stream().noneMatch(Client4TestWithLoad::isRunning)) {
         break;
       }
 
       final int n = clients.stream().mapToInt(c -> c.step.get()).sum();
-      assertTrue(n >= lastStep.get());
+      Assert.assertTrue(n >= lastStep.get());
 
       if (n - lastStep.get() < 50 * numClients) { // Change leader at least 50 steps.
         Thread.sleep(10);
@@ -406,7 +405,6 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
   }
 
   public static void testRequestTimeout(boolean async, MiniRaftCluster cluster, Logger LOG)
throws Exception {
-    LOG.info("Running testRequestTimeout");
     waitForLeader(cluster);
     long time = System.currentTimeMillis();
     try (final RaftClient client = cluster.createClient()) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 19422bc..a96b917 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -156,11 +156,17 @@ public interface RaftTestUtil {
     }
   }
 
-  static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage... expectedMessages)
{
+  static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage[] expectedMessages)
{
+    for(SimpleMessage m : expectedMessages) {
+      assertLogEntries(cluster, m);
+    }
+  }
+
+  static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage expectedMessage) {
     final int size = cluster.getServers().size();
     final long count = cluster.getServerAliveStream()
         .map(s -> s.getState().getLog())
-        .filter(log -> logEntriesContains(log, expectedMessages))
+        .filter(log -> logEntriesContains(log, expectedMessage))
         .count();
     if (2*count <= size) {
       throw new AssertionError("Not in majority: size=" + size

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index d5fdf53..d4c4021 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -154,7 +154,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
     Preconditions.assertNull(previous, "previous");
     final String s = entry.getStateMachineLogEntry().getLogData().toStringUtf8();
     dataMap.put(s, entry);
-    LOG.info("put {}, {} -> {}", entry.getIndex(), s, ServerProtoUtils.toLogEntryString(entry));
+    LOG.info("{}: put {}, {} -> {}", getId(), entry.getIndex(), s, ServerProtoUtils.toLogEntryString(entry));
   }
 
   @Override
@@ -290,7 +290,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
       if (entry != null) {
         return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
       }
-      exception = new IndexOutOfBoundsException("Log entry not found for query " + string);
+      exception = new IndexOutOfBoundsException(getId() + ": LogEntry not found for query
" + string);
     } catch (Exception e) {
       LOG.warn("Failed request " + request, e);
       exception = e;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
index 614787e..a12c52f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/609773e0/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index d98be53..50b2d7f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,11 +27,13 @@ import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 
@@ -53,19 +55,16 @@ public class TestRaftWithGrpc
 
   @Test
   public void testRequestTimeout() throws Exception {
-    try(MiniRaftClusterWithGrpc cluster = newCluster(NUM_SERVERS)) {
-      cluster.start();
-      testRequestTimeout(false, cluster, LOG);
-    }
+    runWithNewCluster(NUM_SERVERS, cluster -> testRequestTimeout(false, cluster, LOG));
   }
 
   @Test
   public void testUpdateViaHeartbeat() throws Exception {
-    LOG.info("Running testUpdateViaHeartbeat");
-    final MiniRaftClusterWithGrpc cluster = newCluster(NUM_SERVERS);
-    cluster.start();
+    runWithNewCluster(NUM_SERVERS, this::runTestUpdateViaHeartbeat);
+  }
+
+  void runTestUpdateViaHeartbeat(MiniRaftClusterWithGrpc cluster) throws Exception {
     waitForLeader(cluster);
-    long waitTime = 5000;
     try (final RaftClient client = cluster.createClient()) {
       // block append requests
       cluster.getServerAliveStream()
@@ -75,7 +74,7 @@ public class TestRaftWithGrpc
 
       CompletableFuture<RaftClientReply>
           replyFuture = client.sendAsync(new RaftTestUtil.SimpleMessage("abc"));
-      Thread.sleep(waitTime);
+      TimeDuration.valueOf(5 , TimeUnit.SECONDS).sleep();
       // replyFuture should not be completed until append request is unblocked.
       Assert.assertTrue(!replyFuture.isDone());
       // unblock append request.
@@ -84,27 +83,28 @@ public class TestRaftWithGrpc
           .map(SimpleStateMachine4Testing::get)
           .forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
 
-      long index = cluster.getLeader().getState().getLog().getNextIndex();
+      final long index = cluster.getLeader().getState().getLog().getNextIndex();
       TermIndex[] leaderEntries = cluster.getLeader().getState().getLog().getEntries(0, Integer.MAX_VALUE);
       // The entries have been appended in the followers
       // although the append entry timed out at the leader
-      cluster.getServerAliveStream().forEach(raftServer -> {
+
+      final TimeDuration sleepTime = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+      cluster.getServerAliveStream().filter(impl -> !impl.isLeader()).forEach(raftServer
->
+          JavaUtils.runAsUnchecked(() -> JavaUtils.attempt(() -> {
         Assert.assertEquals(raftServer.getState().getLog().getNextIndex(), index);
-        if (!raftServer.isLeader()) {
-          TermIndex[] serverEntries = raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE);
-          Assert.assertArrayEquals(serverEntries, leaderEntries);
-        }
-      });
+        TermIndex[] serverEntries = raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE);
+        Assert.assertArrayEquals(serverEntries, leaderEntries);
+      }, 10, sleepTime, "assertRaftLog-" + raftServer.getId(), LOG)));
 
       // Wait for heartbeats from leader to be received by followers
       Thread.sleep(500);
-      RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender -> {
+      RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender ->
+        JavaUtils.runAsUnchecked(() -> JavaUtils.attempt(() -> {
         // FollowerInfo in the leader state should have updated next and match index.
         final long followerMatchIndex = logAppender.getFollower().getMatchIndex();
         Assert.assertTrue(followerMatchIndex >= index - 1);
         Assert.assertEquals(followerMatchIndex + 1, logAppender.getFollower().getNextIndex());
-      });
+      }, 10, sleepTime, "assertRaftLog-" + logAppender.getFollower(), LOG)));
     }
-    cluster.shutdown();
   }
 }



Mime
View raw message