ratis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject incubator-ratis git commit: RATIS-453. When retry failed on an async call, it should fails all the following calls in the sliding window.
Date Tue, 11 Dec 2018 21:22:06 GMT
Repository: incubator-ratis
Updated Branches:
  refs/heads/master 3b0be0287 -> 00274fa39


RATIS-453. When retry failed on an async call, it should fails all the following calls in
the sliding window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/00274fa3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/00274fa3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/00274fa3

Branch: refs/heads/master
Commit: 00274fa39073e359433005c96097efaa24024702
Parents: 3b0be02
Author: Tsz Wo Nicholas Sze <szetszwo@apache.org>
Authored: Tue Dec 11 13:21:41 2018 -0800
Committer: Tsz Wo Nicholas Sze <szetszwo@apache.org>
Committed: Tue Dec 11 13:21:41 2018 -0800

----------------------------------------------------------------------
 .../ratis/client/impl/RaftClientImpl.java       |  62 ++++++-----
 .../ratis/protocol/AlreadyClosedException.java  |   6 +-
 .../org/apache/ratis/retry/RetryPolicies.java   |   4 +-
 .../org/apache/ratis/retry/RetryPolicy.java     |   5 +-
 .../org/apache/ratis/util/SlidingWindow.java    |  41 ++++++-
 .../function/CheckedFunctionWithTimeout.java    |   2 +-
 .../ratis/util/function/FunctionUtils.java      |  34 ++++++
 .../test/java/org/apache/ratis/BaseTest.java    |  54 +++++-----
 .../grpc/client/GrpcClientProtocolClient.java   |   3 +-
 .../java/org/apache/ratis/MiniRaftCluster.java  |   9 +-
 .../java/org/apache/ratis/RaftAsyncTests.java   | 106 +++++++++++++++----
 .../java/org/apache/ratis/RaftTestUtil.java     |  12 +++
 12 files changed, 252 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 4c73d45..58206bd 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -20,11 +20,12 @@ package org.apache.ratis.client.impl;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.RaftClientRpc;
-import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.*;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.util.*;
+import org.apache.ratis.util.function.FunctionUtils;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -78,11 +79,16 @@ final class RaftClientImpl implements RaftClient {
       replyFuture.complete(reply);
     }
 
+    @Override
+    public void fail(Exception e) {
+      replyFuture.completeExceptionally(e);
+    }
+
     CompletableFuture<RaftClientReply> getReplyFuture() {
       return replyFuture;
     }
 
-    public int getAttemptCount() {
+    int getAttemptCount() {
       return attemptCount;
     }
 
@@ -164,9 +170,10 @@ final class RaftClientImpl implements RaftClient {
     try {
       asyncRequestSemaphore.acquire();
     } catch (InterruptedException e) {
-      throw new CompletionException(IOUtils.toInterruptedIOException(
+      return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException(
           "Interrupted when sending " + type + ", message=" + message, e));
     }
+
     final long callId = nextCallId();
     final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum,
         seq -> newRaftClientRequest(server, callId, seq, message, type));
@@ -268,11 +275,17 @@ final class RaftClientImpl implements RaftClient {
         peersInNewConf.filter(p -> !peers.contains(p))::iterator);
   }
 
-  private CompletableFuture<RaftClientReply> sendRequestWithRetryAsync(
-      PendingAsyncRequest pending) {
-    final RaftClientRequest request = pending.newRequest();
+  private void sendRequestWithRetryAsync(PendingAsyncRequest pending) {
     final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
-    return sendRequestAsync(request, pending.getAttemptCount()).thenCompose(reply -> {
+    if (f.isDone()) {
+      return;
+    }
+
+    final RaftClientRequest request = pending.newRequest();
+    sendRequestAsync(request, pending.getAttemptCount()).thenAccept(reply -> {
+      if (f.isDone()) {
+        return;
+      }
       if (reply == null) {
         LOG.debug("schedule attempt #{} with policy {} for {}", pending.getAttemptCount(),
retryPolicy, request);
         scheduler.onTimeout(retryPolicy.getSleepTime(),
@@ -281,8 +294,7 @@ final class RaftClientImpl implements RaftClient {
       } else {
         f.complete(reply);
       }
-      return f;
-    });
+    }).exceptionally(FunctionUtils.consumerAsNullFunction(f::completeExceptionally));
   }
 
   private RaftClientReply sendRequestWithRetry(
@@ -315,7 +327,7 @@ final class RaftClientImpl implements RaftClient {
         getSlidingWindow(request).receiveReply(
             request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
       } else if (!retryPolicy.shouldRetry(attemptCount)) {
-        return handleAsyncRetry(request, attemptCount);
+        handleAsyncRetryFailure(request, attemptCount);
       }
       return reply;
     }).exceptionally(e -> {
@@ -325,30 +337,22 @@ final class RaftClientImpl implements RaftClient {
         LOG.debug("{}: Failed {} with {}", clientId, request, e);
       }
       e = JavaUtils.unwrapCompletionException(e);
-      if (e instanceof GroupMismatchException) {
-        throw new CompletionException(e);
-      } else if (e instanceof IOException) {
-        // once the retryLimit is hit, just remove the request from the
-        // sliding window and throw an exception. The exception thrown here will
-        // make sure its not retried any more with sendRequestWithRetryAsync call.
+      if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
         if (!retryPolicy.shouldRetry(attemptCount)) {
-          return handleAsyncRetry(request, attemptCount);
+          handleAsyncRetryFailure(request, attemptCount);
+        } else {
+          handleIOException(request, (IOException) e, null);
         }
-        handleIOException(request, (IOException)e, null);
-      } else {
-        throw new CompletionException(e);
+        return null;
       }
-      return null;
+      throw new CompletionException(e);
     });
   }
 
-  private RaftClientReply handleAsyncRetry(RaftClientRequest request, int attemptCount) {
-    RaftClientReply reply = new RaftClientReply(request,
-        new RaftRetryFailureException(
-            "Failed " + request + " for " + attemptCount + " attempts with " + retryPolicy),
null);
-    getSlidingWindow(request).receiveReply(
-        request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
-    return reply;
+  private void handleAsyncRetryFailure(RaftClientRequest request, int attemptCount) {
+    final RaftRetryFailureException rfe = new RaftRetryFailureException(
+        "Failed " + request + " for " + (attemptCount-1) + " attempts with " + retryPolicy);
+    getSlidingWindow(request).fail(request.getSeqNum(), rfe);
   }
 
   private RaftClientReply sendRequest(RaftClientRequest request)

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
index 85888a0..f69173f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,4 +24,8 @@ public class AlreadyClosedException extends RaftException {
   public AlreadyClosedException(String message) {
     super(message);
   }
+
+  public AlreadyClosedException(String message, Throwable t) {
+    super(message, t);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
index b405f81..e5cdeaa 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -45,7 +45,7 @@ public interface RetryPolicies {
    * Keep trying a limited number of times, waiting a fixed time between attempts,
    * and then fail by re-throwing the exception.
    */
-  static RetryPolicy retryUpToMaximumCountWithFixedSleep(int maxAttempts, TimeDuration sleepTime)
{
+  static RetryLimited retryUpToMaximumCountWithFixedSleep(int maxAttempts, TimeDuration sleepTime)
{
     return new RetryLimited(maxAttempts, sleepTime);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
index 771e524..ba90435 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
  * Policy abstract for retrying.
  */
 public interface RetryPolicy {
+  TimeDuration ZERO_MILLIS = TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
 
   /**
    * Determines whether it is supposed to retry the connection if the operation
@@ -39,6 +40,6 @@ public interface RetryPolicy {
    * Returns the time duration for sleep in between the retries.
    */
   default TimeDuration getSleepTime() {
-    return TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
+    return ZERO_MILLIS;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index ca622dd..a616f07 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.util;
 
+import org.apache.ratis.protocol.AlreadyClosedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +43,9 @@ public interface SlidingWindow {
     void setReply(REPLY reply);
 
     boolean hasReply();
+
+    default void fail(Exception e) {
+    }
   }
 
   /** A seqNum-to-request map, sorted by seqNum. */
@@ -169,6 +173,8 @@ public interface SlidingWindow {
     private long firstSeqNum = -1;
     /** Is the first request replied? */
     private boolean firstReplied;
+    /** The exception, if there is any. */
+    private Exception exception;
 
     public Client(Object name) {
       this.requests = new RequestMap<REQUEST, REPLY>(name) {
@@ -206,6 +212,12 @@ public interface SlidingWindow {
 
       final long seqNum = nextSeqNum++;
       final REQUEST r = requestConstructor.apply(seqNum);
+
+      if (exception != null) {
+        alreadyClosed(r, exception);
+        return r;
+      }
+
       requests.putNewRequest(r);
 
       final boolean submitted = sendOrDelayRequest(r, sendMethod);
@@ -302,6 +314,33 @@ public interface SlidingWindow {
       firstReplied = false;
       LOG.debug("After resetFirstSeqNum: {}", this);
     }
+
+    /** Fail all requests starting from the given seqNum. */
+    public synchronized void fail(final long startingSeqNum, Exception e) {
+      exception = e;
+
+      boolean handled = false;
+      for(long i = startingSeqNum; i <= requests.lastSeqNum(); i++) {
+        final REQUEST request = requests.getNonRepliedRequest(i, "fail");
+        if (request != null) {
+          if (request.getSeqNum() == startingSeqNum) {
+            request.fail(e);
+          } else {
+            alreadyClosed(request, e);
+          }
+          handled = true;
+        }
+      }
+
+      if (handled) {
+        removeRepliedFromHead();
+      }
+    }
+
+    private void alreadyClosed(REQUEST request, Exception e) {
+      request.fail(new AlreadyClosedException(SlidingWindow.class.getSimpleName() + "$" +
getClass().getSimpleName()
+          + " " + requests.getName() + " is closed.", e));
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
index fddfab2..48b6b9f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
@@ -25,7 +25,7 @@ import java.util.concurrent.TimeoutException;
 @FunctionalInterface
 public interface CheckedFunctionWithTimeout<INPUT, OUTPUT, THROWABLE extends Throwable>
{
   /**
-   * The same as {@link org.apache.ratis.util.CheckedFunction#apply(Object)}
+   * The same as {@link CheckedFunction#apply(Object)}
    * except that this method has a timeout parameter and throws {@link TimeoutException}.
    */
   OUTPUT apply(INPUT input, TimeDuration timeout) throws TimeoutException, THROWABLE;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/util/function/FunctionUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/FunctionUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/function/FunctionUtils.java
new file mode 100644
index 0000000..0e982cb
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/function/FunctionUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util.function;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public interface FunctionUtils {
+  /**
+   * Convert the given consumer to a function with any output type
+   * such that the returned function always returns null.
+   */
+  static <INPUT, OUTPUT> Function<INPUT, OUTPUT> consumerAsNullFunction(Consumer<INPUT>
consumer) {
+    return input -> {
+      consumer.accept(input);
+      return null;
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 3612d21..f7015b7 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -22,6 +22,7 @@ import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedRunnable;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -35,11 +36,14 @@ import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 public abstract class BaseTest {
   public final Logger LOG = LoggerFactory.getLogger(getClass());
 
+  public static final TimeDuration HUNDRED_MILLIS = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+
   {
     LogUtils.setLogLevel(ConfUtils.LOG, Level.WARN);
     LogUtils.setLogLevel(FileUtils.LOG, Level.TRACE);
@@ -83,14 +87,14 @@ public abstract class BaseTest {
   @SafeVarargs
   public static void assertThrowable(
       String description, Throwable t,
-      Class<? extends Throwable> exceptedThrowableClass, Logger log,
-      Class<? extends Throwable>... exceptedCauseClasses) {
+      Class<? extends Throwable> expectedThrowableClass, Logger log,
+      Class<? extends Throwable>... expectedCauseClasses) {
     if (log != null) {
       log.info("The test \"" + description + "\" throws " + t.getClass().getSimpleName(),
t);
     }
-    Assert.assertEquals(exceptedThrowableClass, t.getClass());
+    Assert.assertEquals(expectedThrowableClass, t.getClass());
 
-    for (Class<? extends Throwable> expectedCause : exceptedCauseClasses) {
+    for (Class<? extends Throwable> expectedCause : expectedCauseClasses) {
       final Throwable previous = t;
       t = Objects.requireNonNull(previous.getCause(),
           () -> "previous.getCause() == null for previous=" + previous);
@@ -99,48 +103,46 @@ public abstract class BaseTest {
   }
 
   @SafeVarargs
-  public static void testFailureCase(
+  public static Throwable testFailureCase(
       String description, CheckedRunnable<?> testCode,
-      Class<? extends Throwable> exceptedThrowableClass, Logger log,
-      Class<? extends Throwable>... exceptedCauseClasses) {
-    boolean caught = false;
+      Class<? extends Throwable> expectedThrowableClass, Logger log,
+      Class<? extends Throwable>... expectedCauseClasses) {
     try {
       testCode.run();
     } catch (Throwable t) {
-      caught = true;
-      assertThrowable(description, t, exceptedThrowableClass, log, exceptedCauseClasses);
-    }
-    if (!caught) {
-      Assert.fail("The test \"" + description + "\" does not throw anything.");
+      assertThrowable(description, t, expectedThrowableClass, log, expectedCauseClasses);
+      return t;
     }
+    throw new AssertionError("The test \"" + description + "\" does not throw anything.");
   }
 
   @SafeVarargs
-  public final void testFailureCase(
+  public final Throwable testFailureCase(
       String description, CheckedRunnable<?> testCode,
-      Class<? extends Throwable> exceptedThrowableClass,
-      Class<? extends Throwable>... exceptedCauseClasses) {
-    testFailureCase(description, testCode, exceptedThrowableClass, LOG, exceptedCauseClasses);
+      Class<? extends Throwable> expectedThrowableClass,
+      Class<? extends Throwable>... expectedCauseClasses) {
+    return testFailureCase(description, testCode, expectedThrowableClass, LOG, expectedCauseClasses);
   }
 
   @SafeVarargs
-  public static void testFailureCaseAsync(
+  public static Throwable testFailureCaseAsync(
       String description, Supplier<CompletableFuture<?>> testCode,
-      Class<? extends Throwable> exceptedThrowableClass, Logger log,
-      Class<? extends Throwable>... exceptedCauseClasses) {
+      Class<? extends Throwable> expectedThrowableClass, Logger log,
+      Class<? extends Throwable>... expectedCauseClasses) {
     try {
       testCode.get().join();
-      Assert.fail("The test \"" + description + "\" does not throw anything.");
     } catch (Throwable t) {
       t = JavaUtils.unwrapCompletionException(t);
-      assertThrowable(description, t, exceptedThrowableClass, log, exceptedCauseClasses);
+      assertThrowable(description, t, expectedThrowableClass, log, expectedCauseClasses);
+      return t;
     }
+    throw new AssertionError("The test \"" + description + "\" does not throw anything.");
   }
 
   @SafeVarargs
-  public final void testFailureCaseAsync(
-      String description, Supplier<CompletableFuture<?>> testCode, Class<?
extends Throwable> exceptedThrowableClass,
-      Class<? extends Throwable>... exceptedCauseClasses) {
-    testFailureCaseAsync(description, testCode, exceptedThrowableClass, LOG, exceptedCauseClasses);
+  public final Throwable testFailureCaseAsync(
+      String description, Supplier<CompletableFuture<?>> testCode, Class<?
extends Throwable> expectedThrowableClass,
+      Class<? extends Throwable>... expectedCauseClasses) {
+    return testFailureCaseAsync(description, testCode, expectedThrowableClass, LOG, expectedCauseClasses);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index cf239b6..8a1b111 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -35,6 +35,7 @@ import org.apache.ratis.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolService
 import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc;
 import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
 import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
+import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.NotLeaderException;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -201,7 +202,7 @@ public class GrpcClientProtocolClient implements Closeable {
     CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
       final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
       if (map == null) {
-        return JavaUtils.completeExceptionally(new IOException("Already closed."));
+        return JavaUtils.completeExceptionally(new AlreadyClosedException(getName() + " is
closed."));
       }
       final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
       CollectionUtils.putNew(request.getCallId(), f, map,

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index f1f33e1..31faf35 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -103,11 +103,18 @@ public abstract class MiniRaftCluster implements Closeable {
       }
 
       default void runWithNewCluster(int numServers, CheckedConsumer<CLUSTER, Exception>
testCase) throws Exception {
+        runWithNewCluster(numServers, true, testCase);
+      }
+
+      default void runWithNewCluster(int numServers, boolean startCluster, CheckedConsumer<CLUSTER,
Exception> testCase)
+          throws Exception {
         final StackTraceElement caller = JavaUtils.getCallerStackTraceElement();
         LOG.info("Running " + caller.getMethodName());
         final CLUSTER cluster = newCluster(numServers);
         try {
-          cluster.start();
+          if (startCluster) {
+            cluster.start();
+          }
           testCase.accept(cluster);
         } catch(Throwable t) {
           LOG.error("Failed " + caller + ": " + cluster.printServers(), t);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 0719976..3821f5f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -25,6 +25,7 @@ import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroup;
@@ -32,6 +33,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicies.RetryLimited;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
@@ -42,6 +44,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferExce
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.function.CheckedRunnable;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -52,6 +55,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -95,31 +99,89 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster>
extends Ba
     }
   }
 
+  static void assertRaftRetryFailureException(RaftRetryFailureException rfe, RetryPolicy
retryPolicy, String name) {
+    Assert.assertNotNull(name + " does not have RaftRetryFailureException", rfe);
+    Assert.assertTrue(name + ": unexpected error message, rfe=" + rfe + ", retryPolicy="
+ retryPolicy,
+        rfe.getMessage().contains(retryPolicy.toString()));
+  }
+
   @Test
-  public void testRequestAsyncWithRetryPolicy() throws Exception {
-    runWithNewCluster(NUM_SERVERS, this::runTestRequestAsyncWithRetryPolicy);
+  public void testRequestAsyncWithRetryFailure() throws Exception {
+    runWithNewCluster(1, false, cluster -> runTestRequestAsyncWithRetryFailure(false,
cluster));
   }
 
-  void runTestRequestAsyncWithRetryPolicy(CLUSTER cluster) throws Exception {
-    final RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
-        3, TimeDuration.valueOf(1, TimeUnit.SECONDS));
-    final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
-
-    try(final RaftClient writeClient = cluster.createClient(leader.getId(), retryPolicy))
{
-      // blockStartTransaction of the leader so that no transaction can be committed MAJORITY
-      LOG.info("block leader {}", leader.getId());
-      SimpleStateMachine4Testing.get(leader).blockStartTransaction();
-      final SimpleMessage[] messages = SimpleMessage.create(2);
-      final RaftClientReply reply = writeClient.sendAsync(messages[0]).get();
-      RaftRetryFailureException rfe = reply.getRetryFailureException();
-      Assert.assertNotNull(rfe);
-      Assert.assertTrue(rfe.getMessage().contains(retryPolicy.toString()));
-
-      // unblock leader so that the next transaction can be committed.
-      SimpleStateMachine4Testing.get(leader).unblockStartTransaction();
-      // make sure the the next request succeeds. This will ensure the first
-      // request completed
-      writeClient.sendAsync(messages[1]).get();
+  @Test
+  public void testRequestAsyncWithRetryFailureAfterInitialMessages() throws Exception {
+    runWithNewCluster(1, true, cluster -> runTestRequestAsyncWithRetryFailure(true, cluster));
+  }
+
+  void runTestRequestAsyncWithRetryFailure(boolean initialMessages, CLUSTER cluster) throws
Exception {
+    final RetryLimited retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(10,
HUNDRED_MILLIS);
+
+    try(final RaftClient client = cluster.createClient(null, retryPolicy)) {
+      RaftPeerId leader = null;
+      if (initialMessages) {
+        // cluster is already started, send a few success messages
+        leader = RaftTestUtil.waitForLeader(cluster).getId();
+        final SimpleMessage[] messages = SimpleMessage.create(10, "initial-");
+        final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
+        for (int i = 0; i < messages.length; i++) {
+          replies.add(client.sendAsync(messages[i]));
+        }
+        for (int i = 0; i < messages.length; i++) {
+          RaftTestUtil.assertSuccessReply(replies.get(i));
+        }
+
+        // kill the only server
+        cluster.killServer(leader);
+      }
+
+      // now, either the cluster is not yet started or the server is killed.
+      final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
+      {
+        final SimpleMessage[] messages = SimpleMessage.create(10);
+        int i = 0;
+        // send half of the calls without starting the cluster
+        for (; i < messages.length/2; i++) {
+          replies.add(client.sendAsync(messages[i]));
+        }
+
+        // sleep most of the retry time
+        retryPolicy.getSleepTime().apply(t -> t * (retryPolicy.getMaxAttempts() - 1)).sleep();
+
+        // send another half of the calls without starting the cluster
+        for (; i < messages.length; i++) {
+          replies.add(client.sendAsync(messages[i]));
+        }
+        Assert.assertEquals(messages.length, replies.size());
+      }
+
+      // sleep again so that the first half calls will fail retries.
+      // the second half still have retry time remaining.
+      retryPolicy.getSleepTime().apply(t -> t*2).sleep();
+
+      if (leader != null) {
+        cluster.restartServer(leader, false);
+      } else {
+        cluster.start();
+      }
+
+      // all the calls should fail for ordering guarantee
+      for(int i = 0; i < replies.size(); i++) {
+        final CheckedRunnable<Exception> getReply = replies.get(i)::get;
+        final String name = "retry-failure-" + i;
+        if (i == 0) {
+          final Throwable t = testFailureCase(name, getReply,
+              ExecutionException.class, RaftRetryFailureException.class);
+          assertRaftRetryFailureException((RaftRetryFailureException) t.getCause(), retryPolicy,
name);
+        } else {
+          testFailureCase(name, getReply,
+              ExecutionException.class, AlreadyClosedException.class, RaftRetryFailureException.class);
+        }
+      }
+
+      testFailureCaseAsync("last-request", () -> client.sendAsync(new SimpleMessage("last")),
+          AlreadyClosedException.class, RaftRetryFailureException.class);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index a96b917..6f0a20a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -23,6 +23,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
 import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
@@ -51,6 +52,8 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BooleanSupplier;
@@ -424,4 +427,13 @@ public interface RaftTestUtil {
     }
     return null;
   }
+
+  static void assertSuccessReply(CompletableFuture<RaftClientReply> reply) throws Exception
{
+    assertSuccessReply(reply.get(10, TimeUnit.SECONDS));
+  }
+
+  static void assertSuccessReply(RaftClientReply reply) {
+    Assert.assertNotNull("reply == null", reply);
+    Assert.assertTrue("reply is not success: " + reply, reply.isSuccess());
+  }
 }


Mime
View raw message