From commits-return-490-archive-asf-public=cust-asf.ponee.io@ratis.incubator.apache.org Fri Dec 14 02:57:06 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 416A9180679 for ; Fri, 14 Dec 2018 02:57:05 +0100 (CET) Received: (qmail 28012 invoked by uid 500); 14 Dec 2018 01:57:04 -0000 Mailing-List: contact commits-help@ratis.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ratis.incubator.apache.org Delivered-To: mailing list commits@ratis.incubator.apache.org Received: (qmail 27995 invoked by uid 99); 14 Dec 2018 01:57:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Dec 2018 01:57:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id DEC8A180C6D for ; Fri, 14 Dec 2018 01:57:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -11.691 X-Spam-Level: X-Spam-Status: No, score=-11.691 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, SPF_PASS=-0.001, T_MIXED_ES=0.01, USER_IN_DEF_SPF_WL=-7.5] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id xVXBkr8aDCjb for ; Fri, 14 Dec 2018 01:57:00 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 6329F60E01 for ; Fri, 14 Dec 2018 01:56:59 +0000 (UTC) Received: (qmail 27058 invoked by uid 99); 14 Dec 2018 01:56:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Dec 2018 01:56:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4120EE1456; Fri, 14 Dec 2018 01:56:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: szetszwo@apache.org To: commits@ratis.incubator.apache.org Message-Id: <23feb36dfe55497bb3311daaa9cd15d0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-ratis git commit: RATIS-452. Fix bugs in RaftExceptionBaseTest and RaftReconfigurationBaseTest. Date: Fri, 14 Dec 2018 01:56:58 +0000 (UTC) Repository: incubator-ratis Updated Branches: refs/heads/master 00274fa39 -> cb68b56c2 RATIS-452. Fix bugs in RaftExceptionBaseTest and RaftReconfigurationBaseTest. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/cb68b56c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/cb68b56c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/cb68b56c Branch: refs/heads/master Commit: cb68b56c2cb02ff3953b146d1029156941f33d3a Parents: 00274fa Author: Tsz Wo Nicholas Sze Authored: Fri Dec 14 09:56:27 2018 +0800 Committer: Tsz Wo Nicholas Sze Committed: Fri Dec 14 09:56:27 2018 +0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/util/ExitUtils.java | 119 +++++++++--- .../test/java/org/apache/ratis/BaseTest.java | 10 + .../TestLeaderElectionWithHadoopRpc.java | 6 +- .../TestRaftExceptionWithHadoopRpc.java | 9 +- .../apache/ratis/server/impl/LogAppender.java | 8 +- .../java/org/apache/ratis/MiniRaftCluster.java | 56 +++++- .../java/org/apache/ratis/RaftBasicTests.java | 4 +- .../org/apache/ratis/RaftExceptionBaseTest.java | 190 +++++++++---------- .../java/org/apache/ratis/RaftTestUtil.java | 30 +-- .../impl/RaftReconfigurationBaseTest.java | 17 +- .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 23 ++- .../server/storage/TestSegmentedRaftLog.java | 1 + .../org/apache/ratis/util/TestExitUtils.java | 59 ++++++ 13 files changed, 341 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cb68b56c/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java index 47d3550..669c13c 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,65 +18,112 @@ package org.apache.ratis.util; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; /** Facilitates hooking process termination for tests and debugging. */ -public class ExitUtils { - public static class ExitException extends RuntimeException { +public interface ExitUtils { + class ExitException extends RuntimeException { private static final long serialVersionUID = 1L; - public final int status; + private final int status; - public ExitException(int status, String message, Throwable throwable) { + ExitException(int status, String message, Throwable throwable) { super(message, throwable); this.status = status; } + + public int getStatus() { + return status; + } } - private static volatile boolean systemExitDisabled = false; - private static volatile ExitException firstExitException; + class States { + private static final Logger LOG = LoggerFactory.getLogger(ExitUtils.class); + private static final States INSTANCE = new States(); + + private volatile boolean systemExitDisabled = false; + private volatile boolean terminateOnUncaughtException = true; + private final AtomicReference firstExitException = new AtomicReference<>(); + + private States() { + Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> { + if (terminateOnUncaughtException) { + terminate(-1, thread + " has thrown an uncaught exception", exception, false, LOG); + } + }); + } + + private void setTerminateOnUncaughtException(boolean terminateOnUncaughtException) { + this.terminateOnUncaughtException = terminateOnUncaughtException; + } + + private void disableSystemExit() { + systemExitDisabled = true; + } + + private boolean isSystemExitDisabled() { + return systemExitDisabled; + } + + private ExitException getFirstExitException() { + return firstExitException.get(); + } + + private boolean setFirstExitException(ExitException e) { + Objects.requireNonNull(e, "e == null"); + return firstExitException.compareAndSet(null, e); + } + + private boolean clearFirstExitException() { + return firstExitException.getAndSet(null) != null; + } + } /** * @return the first {@link ExitException} thrown, or null if none thrown yet. */ - public static ExitException getFirstExitException() { - return firstExitException; + static ExitException getFirstExitException() { + return States.INSTANCE.getFirstExitException(); } /** - * Reset the tracking of process termination. - * This is useful when some tests expect an exit but the others do not. + * Clear all previous terminate(..) calls, if there are any. + * + * @return true if the state is changed. */ - public static void resetFirstExitException() { - firstExitException = null; + static boolean clear() { + return States.INSTANCE.clearFirstExitException(); } - /** @return true if {@link #terminate(int, String, Throwable, Logger)} has been invoked. */ - public static boolean isTerminated() { - // Either this member is set or System.exit is actually invoked. - return firstExitException != null; + /** @return true if one of the terminate(..) methods has been invoked. */ + static boolean isTerminated() { + return getFirstExitException() != null; } - public static void assertNotTerminated() { + /** @throws AssertionError if {@link #isTerminated()} == true. */ + static void assertNotTerminated() { if (ExitUtils.isTerminated()) { throw new AssertionError("Unexpected exited.", getFirstExitException()); } } - /** Disable the use of {@link System#exit(int)} for testing. */ - public static void disableSystemExit() { - systemExitDisabled = true; + /** Disable the use of {@link System#exit(int)}. */ + static void disableSystemExit() { + States.INSTANCE.disableSystemExit(); } /** - * Terminate the current process. Note that terminate is the *only* method - * that should be used to terminate the daemon processes. + * * * @param status Exit status * @param message message used to create the {@code ExitException} - * @throws ExitException if System.exit is disabled for test purposes + * @param throwExitException decide if this method should throw {@link ExitException} + * @throws ExitException if throwExitException == true and System.exit is disabled. */ - public static void terminate( - int status, String message, Throwable throwable, Logger log) + static void terminate(int status, String message, Throwable throwable, boolean throwExitException, Logger log) throws ExitException { if (log != null) { final String s = "Terminating with exit status " + status + ": " + message; @@ -87,18 +134,28 @@ public class ExitUtils { } } - if (!systemExitDisabled) { + if (!States.INSTANCE.isSystemExitDisabled()) { System.exit(status); } final ExitException ee = new ExitException(status, message, throwable); - if (firstExitException == null) { - firstExitException = ee; + + States.INSTANCE.setFirstExitException(ee); + + if (throwExitException) { + throw ee; } - throw ee; } - public static void terminate(int status, String message, Logger log) { + static void terminate(int status, String message, Throwable throwable, Logger log) { + terminate(status, message, throwable, true, log); + } + + static void terminate(int status, String message, Logger log) { terminate(status, message, null, log); } + + static void setTerminateOnUncaughtException(boolean terminateOnUncaughtException) { + States.INSTANCE.setTerminateOnUncaughtException(terminateOnUncaughtException); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cb68b56c/ratis-common/src/test/java/org/apache/ratis/BaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java index f7015b7..55024bb 100644 --- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java +++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java @@ -19,11 +19,13 @@ package org.apache.ratis; import org.apache.log4j.Level; import org.apache.ratis.conf.ConfUtils; +import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.function.CheckedRunnable; +import org.junit.After; import org.junit.Assert; import org.junit.Rule; import org.junit.rules.TestName; @@ -43,10 +45,18 @@ public abstract class BaseTest { public final Logger LOG = LoggerFactory.getLogger(getClass()); public static final TimeDuration HUNDRED_MILLIS = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); + public static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, TimeUnit.SECONDS); { LogUtils.setLogLevel(ConfUtils.LOG, Level.WARN); LogUtils.setLogLevel(FileUtils.LOG, Level.TRACE); + + ExitUtils.disableSystemExit(); + } + + @After + public void assertNotTerminated() { + ExitUtils.assertNotTerminated(); } @Rule http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cb68b56c/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLeaderElectionWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLeaderElectionWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLeaderElectionWithHadoopRpc.java index a730fa5..b7cd60a 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLeaderElectionWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLeaderElectionWithHadoopRpc.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -25,8 +25,6 @@ import org.apache.ratis.server.impl.LeaderElectionTests; import org.apache.ratis.util.LogUtils; import org.junit.Test; -import java.io.IOException; - public class TestLeaderElectionWithHadoopRpc extends LeaderElectionTests implements MiniRaftClusterWithHadoopRpc.Factory.Get { @@ -35,7 +33,7 @@ public class TestLeaderElectionWithHadoopRpc } @Override - public MiniRaftClusterWithHadoopRpc newCluster(int numPeers) throws IOException { + public MiniRaftClusterWithHadoopRpc newCluster(int numPeers) { final Configuration conf = new Configuration(); HadoopConfigKeys.Ipc.setHandlers(conf, 20); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cb68b56c/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftExceptionWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftExceptionWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftExceptionWithHadoopRpc.java index 33e4b42..f8b04f7 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftExceptionWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftExceptionWithHadoopRpc.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,16 +20,13 @@ package org.apache.ratis.hadooprpc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.ratis.RaftExceptionBaseTest; -import org.apache.ratis.conf.RaftProperties; - -import java.io.IOException; public class TestRaftExceptionWithHadoopRpc extends RaftExceptionBaseTest implements MiniRaftClusterWithHadoopRpc.Factory.Get { @Override - public MiniRaftClusterWithHadoopRpc newCluster(int numPeers) throws IOException { + public MiniRaftClusterWithHadoopRpc newCluster(int numPeers) { final Configuration conf = new Configuration(); HadoopConfigKeys.Ipc.setHandlers(conf, 20); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); @@ -37,6 +34,4 @@ public class TestRaftExceptionWithHadoopRpc conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000); return MiniRaftClusterWithHadoopRpc.FACTORY.newCluster(numPeers, getProperties(), conf); } - - } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cb68b56c/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index 633496e..f9c8fc5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -88,9 +88,11 @@ public class LogAppender { follower.getPeer().getId() + ")"; } - public void startAppender() { - lifeCycle.transition(STARTING); - daemon.start(); + void startAppender() { + // The life cycle state could be already closed due to server shutdown. + if (lifeCycle.compareAndTransition(NEW, STARTING)) { + daemon.start(); + } } private void runAppender() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cb68b56c/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 31faf35..91bc51c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -64,12 +64,14 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Timer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -90,7 +92,7 @@ public abstract class MiniRaftCluster implements Closeable { public static abstract class Factory { public interface Get { - Supplier properties = JavaUtils.memoize(() -> new RaftProperties()); + Supplier properties = JavaUtils.memoize(RaftProperties::new); Factory getFactory(); @@ -98,7 +100,7 @@ public abstract class MiniRaftCluster implements Closeable { return properties.get(); } - default CLUSTER newCluster(int numPeers) throws IOException { + default CLUSTER newCluster(int numPeers) { return getFactory().newCluster(numPeers, getProperties()); } @@ -117,12 +119,47 @@ public abstract class MiniRaftCluster implements Closeable { } testCase.accept(cluster); } catch(Throwable t) { - LOG.error("Failed " + caller + ": " + cluster.printServers(), t); + LOG.info(cluster.printServers()); + LOG.error("Failed " + caller, t); throw t; } finally { cluster.shutdown(); } } + + default void runWithSameCluster(int numServers, CheckedConsumer testCase) throws Exception { + final StackTraceElement caller = JavaUtils.getCallerStackTraceElement(); + LOG.info("Running " + caller.getMethodName()); + CLUSTER cluster = null; + try { + cluster = getFactory().reuseCluster(numServers, getProperties()); + testCase.accept(cluster); + } catch(Throwable t) { + if (cluster != null) { + LOG.info(cluster.printServers()); + } + LOG.error("Failed " + caller, t); + throw t; + } + } + } + + private final AtomicReference reusableCluster = new AtomicReference<>(); + + private CLUSTER reuseCluster(int numServers, RaftProperties prop) throws IOException { + for(;;) { + final CLUSTER cluster = reusableCluster.get(); + if (cluster != null) { + return cluster; + } + + final CLUSTER newCluster = newCluster(numServers, prop); + if (reusableCluster.compareAndSet(null, newCluster)) { + newCluster.start(); + Runtime.getRuntime().addShutdownHook(new Thread(newCluster::shutdown)); + return newCluster; + } + } } public abstract CLUSTER newCluster( @@ -203,9 +240,8 @@ public abstract class MiniRaftCluster implements Closeable { protected final Map peers = new ConcurrentHashMap<>(); private volatile StateMachine.Registry stateMachineRegistry = null; - private volatile TimeDuration retryInterval; - private final Timer timer; + private final AtomicReference timer = new AtomicReference<>(); protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) { this.group = initRaftGroup(Arrays.asList(ids)); @@ -213,8 +249,6 @@ public abstract class MiniRaftCluster implements Closeable { this.properties = new RaftProperties(properties); this.parameters = parameters; - this.timer = JavaUtils.runRepeatedly(() -> LOG.info("TIMED-PRINT: " + printServers()), - 10, 10, TimeUnit.SECONDS); ExitUtils.disableSystemExit(); } @@ -253,6 +287,9 @@ public abstract class MiniRaftCluster implements Closeable { initServers(); startServers(servers.values()); + + this.timer.updateAndGet(t -> t != null? t + : JavaUtils.runRepeatedly(() -> LOG.info("TIMED-PRINT: " + printServers()), 10, 10, TimeUnit.SECONDS)); } /** @@ -686,6 +723,9 @@ public abstract class MiniRaftCluster implements Closeable { LOG.info("************************************************************** "); LOG.info(printServers()); + // TODO: classes like RaftLog may throw uncaught exception during shutdown (e.g. write after close) + ExitUtils.setTerminateOnUncaughtException(false); + final ExecutorService executor = Executors.newFixedThreadPool(servers.size(), Daemon::new); getServers().forEach(proxy -> executor.submit(proxy::close)); try { @@ -696,7 +736,7 @@ public abstract class MiniRaftCluster implements Closeable { LOG.warn("shutdown interrupted", e); } - timer.cancel(); + Optional.ofNullable(timer.get()).ifPresent(Timer::cancel); ExitUtils.assertNotTerminated(); LOG.info(getClass().getSimpleName() + " shutdown completed"); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cb68b56c/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index f182132..b4efcc6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -179,7 +179,7 @@ public abstract class RaftBasicTests } SimpleMessage[] messages = SimpleMessage.create(1); - RaftTestUtil.sendMessageInNewThread(cluster, messages); + RaftTestUtil.sendMessageInNewThread(cluster, leaderId, messages); Thread.sleep(cluster.getMaxTimeout() + 100); RaftLog followerLog = followerToSendLog.getState().getLog(); @@ -219,7 +219,7 @@ public abstract class RaftBasicTests } SimpleMessage[] messages = SimpleMessage.create(1); - RaftTestUtil.sendMessageInNewThread(cluster, messages); + RaftTestUtil.sendMessageInNewThread(cluster, leaderId, messages); Thread.sleep(cluster.getMaxTimeout() + 100); RaftTestUtil.logEntriesContains(followerToCommit.getState().getLog(), messages); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cb68b56c/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java index bf43ba6..6d58bbc 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,16 +21,15 @@ import org.apache.log4j.Level; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRpc; -import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.storage.RaftLog; -import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.server.storage.RaftLogIOException; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; -import org.junit.After; +import org.apache.ratis.util.SizeInBytes; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -47,29 +46,15 @@ public abstract class RaftExceptionBaseTest LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } - public static final int NUM_PEERS = 3; - - private CLUSTER cluster; + static final int NUM_PEERS = 3; - @Before - public void setup() throws IOException { - final RaftProperties prop = getProperties(); - RaftServerConfigKeys.Log.Appender - .setBufferByteLimit(prop, SizeInBytes.valueOf("4KB")); - cluster = newCluster(NUM_PEERS); - cluster.start(); - } - - @After - public void tearDown() { - if (cluster != null) { - cluster.shutdown(); - } + { + RaftServerConfigKeys.Log.Appender.setBufferByteLimit(getProperties(), SizeInBytes.valueOf("4KB")); } @Test public void testHandleNotLeaderException() throws Exception { - testHandleNotLeaderException(false); + runWithNewCluster(NUM_PEERS, cluster -> runTestHandleNotLeaderException(false, cluster)); } /** @@ -77,100 +62,92 @@ public abstract class RaftExceptionBaseTest */ @Test public void testHandleNotLeaderAndIOException() throws Exception { - testHandleNotLeaderException(true); + runWithNewCluster(NUM_PEERS, cluster -> runTestHandleNotLeaderException(true, cluster)); } - private void testHandleNotLeaderException(boolean killNewLeader) - throws Exception { - RaftTestUtil.waitForLeader(cluster); - final RaftPeerId leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient(leaderId); + void runTestHandleNotLeaderException(boolean killNewLeader, CLUSTER cluster) throws Exception { + final RaftPeerId oldLeader = RaftTestUtil.waitForLeader(cluster).getId(); + try(final RaftClient client = cluster.createClient(oldLeader)) { + sendMessage("m1", client); - RaftClientReply reply = client.send(new SimpleMessage("m1")); - Assert.assertTrue(reply.isSuccess()); + // enforce leader change + final RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, oldLeader); + + if (killNewLeader) { + // kill the new leader + cluster.killServer(newLeader); + } - // enforce leader change - RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, leaderId); + final RaftClientRpc rpc = client.getClientRpc(); + JavaUtils.attempt(() -> assertNotLeaderException(newLeader, "m2", oldLeader, rpc, cluster), + 10, ONE_SECOND, "assertNotLeaderException", LOG); - if (killNewLeader) { - // kill the new leader - cluster.killServer(newLeader); + sendMessage("m3", client); } + } - RaftClientRpc rpc = client.getClientRpc(); - reply= null; - for (int i = 0; reply == null && i < 10; i++) { - try { - reply = rpc.sendRequest(cluster.newRaftClientRequest( - ClientId.randomId(), leaderId, new SimpleMessage("m2"))); - } catch (IOException ignored) { - Thread.sleep(1000); - } - } + RaftClientReply assertNotLeaderException(RaftPeerId expectedSuggestedLeader, + String messageId, RaftPeerId server, RaftClientRpc rpc, CLUSTER cluster) throws IOException { + final SimpleMessage message = new SimpleMessage(messageId); + final RaftClientReply reply = rpc.sendRequest(cluster.newRaftClientRequest(ClientId.randomId(), server, message)); Assert.assertNotNull(reply); Assert.assertFalse(reply.isSuccess()); final NotLeaderException nle = reply.getNotLeaderException(); Objects.requireNonNull(nle); - Assert.assertEquals(newLeader, nle.getSuggestedLeader().getId()); + Assert.assertEquals(expectedSuggestedLeader, nle.getSuggestedLeader().getId()); + return reply; + } - reply = client.send(new SimpleMessage("m3")); + static void sendMessage(String message, RaftClient client) throws IOException { + final RaftClientReply reply = client.send(new SimpleMessage(message)); Assert.assertTrue(reply.isSuccess()); - client.close(); } @Test public void testNotLeaderExceptionWithReconf() throws Exception { - Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster)); - - final RaftPeerId leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient(leaderId); - - // enforce leader change - RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, leaderId); - - // also add two new peers - // add two more peers - MiniRaftCluster.PeerChanges change = cluster.addNewPeers( - new String[]{"ss1", "ss2"}, true); - // trigger setConfiguration - LOG.info("Start changing the configuration: {}", - Arrays.asList(change.allPeersInNewConf)); - try(final RaftClient c2 = cluster.createClient(newLeader)) { - RaftClientReply reply = c2.setConfiguration(change.allPeersInNewConf); - Assert.assertTrue(reply.isSuccess()); - } - LOG.info(cluster.printServers()); - - RaftClientRpc rpc = client.getClientRpc(); - RaftClientReply reply = null; - // it is possible that the remote peer's rpc server is not ready. need retry - for (int i = 0; reply == null && i < 10; i++) { - try { - reply = rpc.sendRequest(cluster.newRaftClientRequest( - ClientId.randomId(), leaderId, new SimpleMessage("m1"))); - } catch (IOException ignored) { - Thread.sleep(1000); + runWithNewCluster(NUM_PEERS, this::runTestNotLeaderExceptionWithReconf); + } + + void runTestNotLeaderExceptionWithReconf(CLUSTER cluster) throws Exception { + final RaftPeerId oldLeader = RaftTestUtil.waitForLeader(cluster).getId(); + try(final RaftClient client = cluster.createClient(oldLeader)) { + + // enforce leader change + final RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, oldLeader); + + // add two more peers + MiniRaftCluster.PeerChanges change = cluster.addNewPeers(new String[]{"ss1", "ss2"}, true); + // trigger setConfiguration + LOG.info("Start changing the configuration: {}", Arrays.asList(change.allPeersInNewConf)); + try (final RaftClient c2 = cluster.createClient(newLeader)) { + RaftClientReply reply = c2.setConfiguration(change.allPeersInNewConf); + Assert.assertTrue(reply.isSuccess()); + } + LOG.info(cluster.printServers()); + + // it is possible that the remote peer's rpc server is not ready. need retry + final RaftClientRpc rpc = client.getClientRpc(); + final RaftClientReply reply = JavaUtils.attempt( + () -> assertNotLeaderException(newLeader, "m1", oldLeader, rpc, cluster), + 10, ONE_SECOND, "assertNotLeaderException", LOG); + + final Collection peers = cluster.getPeers(); + final RaftPeer[] peersFromReply = reply.getNotLeaderException().getPeers(); + Assert.assertEquals(peers.size(), peersFromReply.length); + for (RaftPeer p : peersFromReply) { + Assert.assertTrue(peers.contains(p)); } - } - Assert.assertNotNull(reply); - Assert.assertFalse(reply.isSuccess()); - final NotLeaderException nle = reply.getNotLeaderException(); - Objects.requireNonNull(nle); - Assert.assertEquals(newLeader, nle.getSuggestedLeader().getId()); - Collection peers = cluster.getPeers(); - RaftPeer[] peersFromReply = reply.getNotLeaderException().getPeers(); - Assert.assertEquals(peers.size(), peersFromReply.length); - for (RaftPeer p : peersFromReply) { - Assert.assertTrue(peers.contains(p)); - } - reply = client.send(new SimpleMessage("m2")); - Assert.assertTrue(reply.isSuccess()); - client.close(); + sendMessage("m2", client); + } } @Test public void testGroupMismatchException() throws Exception { + runWithSameCluster(NUM_PEERS, this::runTestGroupMismatchException); + } + + void runTestGroupMismatchException(CLUSTER cluster) throws Exception { final RaftGroup clusterGroup = cluster.getGroup(); Assert.assertEquals(NUM_PEERS, clusterGroup.getPeers().size()); @@ -199,6 +176,10 @@ public abstract class RaftExceptionBaseTest @Test public void testStaleReadException() throws Exception { + runWithSameCluster(NUM_PEERS, this::runTestStaleReadException); + } + + void runTestStaleReadException(CLUSTER cluster) throws Exception { RaftTestUtil.waitForLeader(cluster); try (RaftClient client = cluster.createClient()) { final RaftPeerId follower = cluster.getFollowers().iterator().next().getId(); @@ -210,19 +191,18 @@ public abstract class RaftExceptionBaseTest @Test public void testLogAppenderBufferCapacity() throws Exception { - RaftTestUtil.waitForLeader(cluster); - final RaftPeerId leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient(leaderId); + runWithSameCluster(NUM_PEERS, this::runTestLogAppenderBufferCapacity); + } + + void runTestLogAppenderBufferCapacity(CLUSTER cluster) throws Exception { + final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); byte[] bytes = new byte[8192]; Arrays.fill(bytes, (byte) 1); - SimpleMessage msg = - new SimpleMessage(new String(bytes)); - try { - client.send(msg); - Assert.fail("Expected StateMachineException not thrown"); - } catch (StateMachineException sme) { - Assert.assertTrue(sme.getMessage() - .contains("exceeds the max buffer limit")); + SimpleMessage msg = new SimpleMessage(new String(bytes)); + try (RaftClient client = cluster.createClient(leaderId)) { + testFailureCase("testLogAppenderBufferCapacity", + () -> client.send(msg), + StateMachineException.class, RaftLogIOException.class); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cb68b56c/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 6f0a20a..fe8dd37 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -36,6 +36,7 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.AutoCloseableLock; +import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; @@ -188,27 +189,29 @@ public interface RaftTestUtil { } } - static void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage... expectedMessages) { - final TermIndex[] termIndices = log.getEntries(1, Long.MAX_VALUE); - final List entries = new ArrayList<>(expectedMessages.length); - for (TermIndex ti : termIndices) { - final LogEntryProto e; + static Iterable getLogEntryProtos(RaftLog log) { + return CollectionUtils.as(log.getEntries(0, Long.MAX_VALUE), ti -> { try { - e = log.get(ti.getIndex()); + return log.get(ti.getIndex()); } catch (IOException exception) { throw new AssertionError("Failed to get log at " + ti, exception); } + }); + } + static void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage... expectedMessages) { + final List entries = new ArrayList<>(expectedMessages.length); + for(LogEntryProto e : getLogEntryProtos(log)) { + final String s = ServerProtoUtils.toString(e); if (e.hasStateMachineLogEntry()) { - LOG.info(ServerProtoUtils.toString(e) + ", " + e.getStateMachineLogEntry().toString().trim().replace("\n", ", ")); + LOG.info(s + ", " + e.getStateMachineLogEntry().toString().trim().replace("\n", ", ")); entries.add(e); } else if (e.hasConfigurationEntry()) { - LOG.info("Found ConfigurationEntry at {}, ignoring it.", ti); + LOG.info("Found {}, ignoring it.", s); } else if (e.hasMetadataEntry()) { - LOG.info("Found MetadataEntry at {}, ignoring it.", ti); + LOG.info("Found {}, ignoring it.", s); } else { - throw new AssertionError("Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + ti - + ": " + ServerProtoUtils.toString(e)); + throw new AssertionError("Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + s); } } @@ -384,8 +387,7 @@ public interface RaftTestUtil { Thread.sleep(3 * maxTimeout); } - static void sendMessageInNewThread(MiniRaftCluster cluster, SimpleMessage... messages) { - RaftPeerId leaderId = cluster.getLeader().getId(); + static void sendMessageInNewThread(MiniRaftCluster cluster, RaftPeerId leaderId, SimpleMessage... messages) { new Thread(() -> { try (final RaftClient client = cluster.createClient(leaderId)) { for (SimpleMessage mssg: messages) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cb68b56c/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 3bf1ce7..a8f4c7b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -55,7 +55,6 @@ public abstract class RaftReconfigurationBaseTest { static { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } private static final DelayLocalExecutionInjection logSyncDelay = @@ -337,11 +336,15 @@ public abstract class RaftReconfigurationBaseTest 2", commitIndex <= 2); + } final RaftPeerId killed = RaftTestUtil.waitAndKillLeader(cluster); Assert.assertEquals(leaderId, killed); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cb68b56c/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index 50b2d7f..7026b33 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -25,6 +25,7 @@ import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.JavaUtils; @@ -83,28 +84,30 @@ public class TestRaftWithGrpc .map(SimpleStateMachine4Testing::get) .forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData); - final long index = cluster.getLeader().getState().getLog().getNextIndex(); - TermIndex[] leaderEntries = cluster.getLeader().getState().getLog().getEntries(0, Integer.MAX_VALUE); + final RaftLog leaderLog = cluster.getLeader().getState().getLog(); // The entries have been appended in the followers // although the append entry timed out at the leader - - final TimeDuration sleepTime = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); cluster.getServerAliveStream().filter(impl -> !impl.isLeader()).forEach(raftServer -> JavaUtils.runAsUnchecked(() -> JavaUtils.attempt(() -> { - Assert.assertEquals(raftServer.getState().getLog().getNextIndex(), index); - TermIndex[] serverEntries = raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE); + final long leaderNextIndex = leaderLog.getNextIndex(); + final TermIndex[] leaderEntries = leaderLog.getEntries(0, Long.MAX_VALUE); + + final RaftLog followerLog = raftServer.getState().getLog(); + Assert.assertEquals(leaderNextIndex, followerLog.getNextIndex()); + final TermIndex[] serverEntries = followerLog.getEntries(0, Long.MAX_VALUE); Assert.assertArrayEquals(serverEntries, leaderEntries); - }, 10, sleepTime, "assertRaftLog-" + raftServer.getId(), LOG))); + }, 10, HUNDRED_MILLIS, "assertRaftLog-" + raftServer.getId(), LOG))); // Wait for heartbeats from leader to be received by followers Thread.sleep(500); RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender -> - JavaUtils.runAsUnchecked(() -> JavaUtils.attempt(() -> { + JavaUtils.runAsUnchecked(() -> JavaUtils.attempt(() -> { + final long leaderNextIndex = leaderLog.getNextIndex(); // FollowerInfo in the leader state should have updated next and match index. final long followerMatchIndex = logAppender.getFollower().getMatchIndex(); - Assert.assertTrue(followerMatchIndex >= index - 1); + Assert.assertTrue(followerMatchIndex >= leaderNextIndex - 1); Assert.assertEquals(followerMatchIndex + 1, logAppender.getFollower().getNextIndex()); - }, 10, sleepTime, "assertRaftLog-" + logAppender.getFollower(), LOG))); + }, 10, HUNDRED_MILLIS, "assertRaftLog-" + logAppender.getFollower(), LOG))); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cb68b56c/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index 6e516bc..a247678 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -487,6 +487,7 @@ public class TestSegmentedRaftLog extends BaseTest { Objects.requireNonNull(exitException, "exitException == null"); Assert.assertEquals(TimeoutIOException.class, exitException.getCause().getClass()); }, 3*numRetries, syncTimeout, "RaftLogWorker should catch TimeoutIOException and exit", LOG); + ExitUtils.clear(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cb68b56c/ratis-test/src/test/java/org/apache/ratis/util/TestExitUtils.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestExitUtils.java b/ratis-test/src/test/java/org/apache/ratis/util/TestExitUtils.java new file mode 100644 index 0000000..25b05fd --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/util/TestExitUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.util.ExitUtils.ExitException; +import org.junit.Assert; +import org.junit.Test; + +public class TestExitUtils extends BaseTest { + /** Test if {@link BaseTest} can handle uncaught exception. */ + @Test(timeout = 1000) + public void testUncaughtException() throws Exception { + Assert.assertFalse(ExitUtils.isTerminated()); + Assert.assertFalse(ExitUtils.clear()); + + final Thread t = new Thread(null, () -> { + throw new AssertionError("Testing"); + }, "testThread"); + t.start(); + t.join(); + + Assert.assertTrue(ExitUtils.isTerminated()); + Assert.assertTrue(ExitUtils.clear()); + } + + /** Test if {@link BaseTest} can handle ExitUtils.terminate(..). */ + @Test(timeout = 1000) + public void testExitStatus() { + Assert.assertFalse(ExitUtils.isTerminated()); + Assert.assertFalse(ExitUtils.clear()); + + final int status = -1; + try { + ExitUtils.terminate(status, "testExitStatus", LOG); + Assert.fail(); + } catch (ExitException e) { + Assert.assertEquals(status, e.getStatus()); + } + + Assert.assertTrue(ExitUtils.isTerminated()); + Assert.assertTrue(ExitUtils.clear()); + } +}