ratis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject incubator-ratis git commit: RATIS-359. Add timeout support for Watch requests.
Date Wed, 05 Dec 2018 01:20:35 GMT
Repository: incubator-ratis
Updated Branches:
  refs/heads/master a56bfcebd -> d72d9c6eb


RATIS-359. Add timeout support for Watch requests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/d72d9c6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/d72d9c6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/d72d9c6e

Branch: refs/heads/master
Commit: d72d9c6eb8cb32fa4058b5ab5710e9cd45e2653d
Parents: a56bfce
Author: Tsz Wo Nicholas Sze <szetszwo@apache.org>
Authored: Tue Dec 4 17:20:11 2018 -0800
Committer: Tsz Wo Nicholas Sze <szetszwo@apache.org>
Committed: Tue Dec 4 17:20:11 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/conf/ConfUtils.java   |   9 ++
 .../ratis/protocol/NotReplicatedException.java  |   6 +-
 .../apache/ratis/protocol/RaftClientReply.java  |   6 +
 .../java/org/apache/ratis/util/StringUtils.java |  19 +++
 .../org/apache/ratis/util/TimeDuration.java     |  23 +++
 .../java/org/apache/ratis/util/Timestamp.java   |  12 +-
 .../ratis/server/RaftServerConfigKeys.java      |  25 +++-
 .../apache/ratis/server/impl/LeaderState.java   |  16 +-
 .../ratis/server/impl/RaftServerImpl.java       |   2 +-
 .../apache/ratis/server/impl/WatchRequests.java | 113 ++++++++++----
 .../org/apache/ratis/WatchRequestTests.java     | 150 ++++++++++++++++---
 .../org/apache/ratis/util/TestTimeDuration.java |  18 ++-
 12 files changed, 338 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
index 3ffd8be..98ee162 100644
--- a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
@@ -94,6 +94,15 @@ public interface ConfUtils {
     };
   }
 
+  static BiConsumer<String, TimeDuration> requirePositive() {
+    return (key, value) -> {
+      if (value.getDuration() <= 0) {
+        throw new IllegalArgumentException(
+            key + " = " + value + " is non-positive.");
+      }
+    };
+  }
+
   static BiFunction<String, Long, Integer> requireInt() {
     return (key, value) -> {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java
index 0a85b8e..c8643d7 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -25,8 +25,8 @@ public class NotReplicatedException extends RaftException {
   private final long logIndex;
 
   public NotReplicatedException(long callId, ReplicationLevel requiredReplication, long logIndex)
{
-    super("Request with call Id " + callId + " is committed with log index " + logIndex
-        + " but not yet replicated to " + requiredReplication);
+    super("Request with call Id " + callId + " and log index " + logIndex
+        + " is not yet replicated to " + requiredReplication);
     this.callId = callId;
     this.requiredReplication = requiredReplication;
     this.logIndex = logIndex;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 7b3979b..7a9574f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -86,6 +86,12 @@ public class RaftClientReply extends RaftClientMessage {
         request.getCallId(), true, message, null, 0L, commitInfos);
   }
 
+  public RaftClientReply(RaftClientRequest request, NotReplicatedException nre,
+      Collection<CommitInfoProto> commitInfos) {
+    this(request.getClientId(), request.getServerId(), request.getRaftGroupId(),
+        request.getCallId(), false, request.getMessage(), nre, nre.getLogIndex(), commitInfos);
+  }
+
   public RaftClientReply(RaftClientReply reply, NotReplicatedException nre) {
     this(reply.getClientId(), reply.getServerId(), reply.getRaftGroupId(),
         reply.getCallId(), false, reply.getMessage(), nre, reply.getLogIndex(), reply.getCommitInfos());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
index be797a0..1a6d701 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
 public class StringUtils {
@@ -146,4 +147,22 @@ public class StringUtils {
       return b.append("\n}").toString();
     }
   }
+
+  public static String completableFuture2String(CompletableFuture<?> future, boolean
includeDetails) {
+    if (!future.isDone()) {
+      return "NOT_DONE";
+    } else if (future.isCancelled()) {
+      return "CANCELLED";
+    } else if (future.isCompletedExceptionally()) {
+      if (!includeDetails) {
+        return "EXCEPTION";
+      }
+      return future.thenApply(Objects::toString).exceptionally(Throwable::toString).join();
+    } else {
+      if (!includeDetails) {
+        return "COMPLETED";
+      }
+      return "" + future.join();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 2fad806..7daa4dd 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -126,6 +126,29 @@ public class TimeDuration implements Comparable<TimeDuration> {
     return Math.toIntExact(toLong(targetUnit));
   }
 
+  public TimeDuration to(TimeUnit targetUnit) {
+    return valueOf(toLong(targetUnit), targetUnit);
+  }
+
+  /** Round up to the given nanos to nearest multiple (in nanoseconds) of this {@link TimeDuration}.
*/
+  public long roundUp(long nanos) {
+    if (duration <= 0) {
+      throw new ArithmeticException(
+          "Rounding up to a non-positive " + getClass().getSimpleName() + " (=" + this +
")");
+    }
+
+    final long divisor = unit.toNanos(duration);
+    if (nanos == 0 || divisor == 1) {
+      return nanos;
+    }
+
+    long remainder = nanos % divisor; // In Java, the sign of remainder is the same as the
dividend.
+    if (remainder > 0) {
+      remainder -= divisor;
+    }
+    return nanos - remainder;
+  }
+
   /**
    * Apply the given operator to the duration value of this object.
    *

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
index 96e2a57..8ab3f6b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -29,6 +29,16 @@ public class Timestamp implements Comparable<Timestamp> {
 
   private static final long START_TIME = System.nanoTime();
 
+  /** @return a {@link Timestamp} for the given nanos. */
+  public static Timestamp valueOf(long nanos) {
+    return new Timestamp(nanos);
+  }
+
+  /** @return a long in nanos for the current time. */
+  public static long currentTimeNanos() {
+    return System.nanoTime();
+  }
+
   /** @return the latest timestamp. */
   public static Timestamp latest(Timestamp a, Timestamp b) {
     return a.compareTo(b) > 0? a: b;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index ed08ca9..25d4b0c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -79,6 +79,29 @@ public interface RaftServerConfigKeys {
 
   }
 
+  String WATCH_TIMEOUT_DENOMINATION_KEY = PREFIX + ".watch.timeout.denomination";
+  TimeDuration WATCH_TIMEOUT_DENOMINATION_DEFAULT = TimeDuration.valueOf(1, TimeUnit.SECONDS);
+  static TimeDuration watchTimeoutDenomination(RaftProperties properties) {
+    return getTimeDuration(properties.getTimeDuration(WATCH_TIMEOUT_DENOMINATION_DEFAULT.getUnit()),
+        WATCH_TIMEOUT_DENOMINATION_KEY, WATCH_TIMEOUT_DENOMINATION_DEFAULT, getDefaultLog(),
requirePositive());
+  }
+  static void setWatchTimeoutDenomination(RaftProperties properties, TimeDuration watchTimeout)
{
+    setTimeDuration(properties::setTimeDuration, WATCH_TIMEOUT_DENOMINATION_KEY, watchTimeout);
+  }
+
+  /**
+   * Timeout for watch requests.
+   */
+  String WATCH_TIMEOUT_KEY = PREFIX + ".watch.timeout";
+  TimeDuration WATCH_TIMEOUT_DEFAULT = TimeDuration.valueOf(10, TimeUnit.SECONDS);
+  static TimeDuration watchTimeout(RaftProperties properties) {
+    return getTimeDuration(properties.getTimeDuration(WATCH_TIMEOUT_DEFAULT.getUnit()),
+        WATCH_TIMEOUT_KEY, WATCH_TIMEOUT_DEFAULT, getDefaultLog(), requirePositive());
+  }
+  static void setWatchTimeout(RaftProperties properties, TimeDuration watchTimeout) {
+    setTimeDuration(properties::setTimeDuration, WATCH_TIMEOUT_KEY, watchTimeout);
+  }
+
   interface Log {
     String PREFIX = RaftServerConfigKeys.PREFIX + ".log";
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 1bc6e79..924a7d0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -35,6 +35,7 @@ import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -199,7 +200,7 @@ public class LeaderState {
     this.currentTerm = state.getCurrentTerm();
     processor = new EventProcessor();
     this.pendingRequests = new PendingRequests(server.getId());
-    this.watchRequests = new WatchRequests(server);
+    this.watchRequests = new WatchRequests(server.getId(), properties);
 
     final RaftConfiguration conf = server.getRaftConf();
     Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId());
@@ -299,9 +300,18 @@ public class LeaderState {
     return pendingRequests.add(request, entry);
   }
 
-  CompletableFuture<Void> addWatchReqeust(RaftClientRequest request) {
+  CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest request) {
     LOG.debug("{}: addWatchRequest {}", server.getId(), request);
-    return watchRequests.add(request.getType().getWatch());
+    return watchRequests.add(request)
+        .thenApply(v -> new RaftClientReply(request, server.getCommitInfos()))
+        .exceptionally(e -> {
+          e = JavaUtils.unwrapCompletionException(e);
+          if (e instanceof NotReplicatedException) {
+            return new RaftClientReply(request, (NotReplicatedException)e, server.getCommitInfos());
+          } else {
+            throw new CompletionException(e);
+          }
+        });
   }
 
   void commitIndexChanged() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 4ea78ce..ed74866 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -556,7 +556,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
 
   private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request)
{
     return role.getLeaderState()
-        .map(ls -> ls.addWatchReqeust(request).thenApply(v -> new RaftClientReply(request,
getCommitInfos())))
+        .map(ls -> ls.addWatchReqeust(request))
         .orElseGet(() -> CompletableFuture.completedFuture(
             new RaftClientReply(request, generateNotLeaderException(), getCommitInfos())));
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index b7d6635..912d12d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,10 +17,13 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto;
+import org.apache.ratis.protocol.NotReplicatedException;
 import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,37 +31,48 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.EnumMap;
 import java.util.Map;
-import java.util.PriorityQueue;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 class WatchRequests {
   public static final Logger LOG = LoggerFactory.getLogger(WatchRequests.class);
 
   static class PendingWatch {
     private final WatchRequestTypeProto watch;
-    private final CompletableFuture<Void> future = new CompletableFuture<>();
+    private final Timestamp creationTime;
+    private final Supplier<CompletableFuture<Void>> future = JavaUtils.memoize(CompletableFuture::new);
 
-    PendingWatch(WatchRequestTypeProto watch) {
+    PendingWatch(WatchRequestTypeProto watch, Timestamp creationTime) {
       this.watch = watch;
+      this.creationTime = creationTime;
     }
 
     CompletableFuture<Void> getFuture() {
-      return future;
+      return future.get();
     }
 
     long getIndex() {
       return watch.getIndex();
     }
 
+    Timestamp getCreationTime() {
+      return creationTime;
+    }
+
     @Override
     public String toString() {
-      return RaftClientRequest.Type.toString(watch);
+      return RaftClientRequest.Type.toString(watch) + "@" + creationTime
+          + "?" + StringUtils.completableFuture2String(future.get(), true);
     }
   }
 
   private class WatchQueue {
     private final ReplicationLevel replication;
-    private final PriorityQueue<PendingWatch> q = new PriorityQueue<>(Comparator.comparing(PendingWatch::getIndex));
+    private final SortedMap<PendingWatch, PendingWatch> q = new TreeMap<>(
+        Comparator.comparingLong(PendingWatch::getIndex).thenComparing(PendingWatch::getCreationTime));
     private volatile long index; //Invariant: q.isEmpty() or index < any element q
 
     WatchQueue(ReplicationLevel replication) {
@@ -69,13 +83,43 @@ class WatchRequests {
       return index;
     }
 
-    synchronized boolean offer(PendingWatch pending) {
-      if (pending.getIndex() > getIndex()) { // compare again synchronized
-        final boolean offered = q.offer(pending);
-        Preconditions.assertTrue(offered);
-        return true;
+    PendingWatch add(RaftClientRequest request) {
+      final long currentTime = Timestamp.currentTimeNanos();
+      final long roundUp = watchTimeoutDenominationNanos.roundUp(currentTime);
+      final PendingWatch pending = new PendingWatch(request.getType().getWatch(), Timestamp.valueOf(roundUp));
+
+      synchronized (this) {
+        if (pending.getIndex() > getIndex()) { // compare again synchronized
+          final PendingWatch previous = q.putIfAbsent(pending, pending);
+          if (previous != null) {
+            return previous;
+          }
+        } else {
+          return null;
+        }
+      }
+
+      final TimeDuration timeout = watchTimeoutNanos.apply(duration -> duration + roundUp
- currentTime);
+      scheduler.onTimeout(timeout, () -> handleTimeout(request, pending),
+          LOG, () -> name + ": Failed to timeout " + request);
+      return pending;
+    }
+
+    void handleTimeout(RaftClientRequest request, PendingWatch pending) {
+      if (removeExisting(pending)) {
+        pending.getFuture().completeExceptionally(
+            new NotReplicatedException(request.getCallId(), replication, pending.getIndex()));
+        LOG.debug("{}: timeout {}, {}", name, pending, request);
       }
-      return false;
+    }
+
+    synchronized boolean removeExisting(PendingWatch pending) {
+      final PendingWatch removed = q.remove(pending);
+      if (removed == null) {
+        return false;
+      }
+      Preconditions.assertTrue(removed == pending);
+      return true;
     }
 
     synchronized void updateIndex(final long newIndex) {
@@ -85,38 +129,53 @@ class WatchRequests {
       LOG.debug("{}: update {} index from {} to {}", name, replication, index, newIndex);
       index = newIndex;
 
-      for(;;) {
-        final PendingWatch peeked = q.peek();
-        if (peeked == null || peeked.getIndex() > newIndex) {
+      for(; !q.isEmpty();) {
+        final PendingWatch first = q.firstKey();
+        if (first.getIndex() > newIndex) {
           return;
         }
-        final PendingWatch polled = q.poll();
-        Preconditions.assertTrue(polled == peeked);
-        LOG.debug("{}: complete {}", name, polled);
-        polled.getFuture().complete(null);
+        final boolean removed = removeExisting(first);
+        Preconditions.assertTrue(removed);
+        LOG.debug("{}: complete {}", name, first);
+        first.getFuture().complete(null);
       }
     }
 
     synchronized void failAll(Exception e) {
-      for(; !q.isEmpty(); ) {
-        q.poll().getFuture().completeExceptionally(e);
+      for(PendingWatch pending : q.values()) {
+        pending.getFuture().completeExceptionally(e);
       }
+      q.clear();
     }
   }
 
   private final String name;
   private final Map<ReplicationLevel, WatchQueue> queues = new EnumMap<>(ReplicationLevel.class);
 
-  WatchRequests(Object name) {
+  private final TimeDuration watchTimeoutNanos;
+  private final TimeDuration watchTimeoutDenominationNanos;
+  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(2);
+
+  WatchRequests(Object name, RaftProperties properties) {
     this.name = name + "-" + getClass().getSimpleName();
+
+    final TimeDuration watchTimeout = RaftServerConfigKeys.watchTimeout(properties);
+    this.watchTimeoutNanos = watchTimeout.to(TimeUnit.NANOSECONDS);
+    final TimeDuration watchTimeoutDenomination = RaftServerConfigKeys.watchTimeoutDenomination(properties);
+    this.watchTimeoutDenominationNanos = watchTimeoutDenomination.to(TimeUnit.NANOSECONDS);
+    Preconditions.assertTrue(watchTimeoutNanos.getDuration() % watchTimeoutDenominationNanos.getDuration()
== 0L,
+        () -> "watchTimeout (=" + watchTimeout + ") is not a multiple of watchTimeoutDenomination
(="
+            + watchTimeoutDenomination + ").");
+
     Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new WatchQueue(r)));
   }
 
-  CompletableFuture<Void> add(WatchRequestTypeProto watch) {
+  CompletableFuture<Void> add(RaftClientRequest request) {
+    final WatchRequestTypeProto watch = request.getType().getWatch();
     final WatchQueue queue = queues.get(watch.getReplication());
     if (watch.getIndex() > queue.getIndex()) { // compare without synchronization
-      final PendingWatch pending = new PendingWatch(watch);
-      if (queue.offer(pending)) {
+      final PendingWatch pending = queue.add(request);
+      if (pending != null) {
         return pending.getFuture();
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index dbe5865..a43c42b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,19 +19,17 @@ package org.apache.ratis;
 
 import org.apache.log4j.Level;
 import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.NotReplicatedException;
 import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.CheckedFunction;
-import org.apache.ratis.util.LogUtils;
-import org.apache.ratis.util.ProtoUtils;
-import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.*;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -108,8 +106,8 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
               watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY),
               watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL),
               watchMajorityCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED),
-              watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED)
-          ));
+              watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED),
+              log));
         });
       }
     }
@@ -120,7 +118,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
     }
   }
 
-  static void runTest(CheckedFunction<TestParameters, Void, Exception> testCase, MiniRaftCluster
cluster, Logger LOG) throws Exception {
+  static void runTest(CheckedConsumer<TestParameters, Exception> testCase, MiniRaftCluster
cluster, Logger LOG) throws Exception {
     try(final RaftClient writeClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
         final RaftClient watchMajorityClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
         final RaftClient watchAllClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
@@ -133,7 +131,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
             n, writeClient, watchMajorityClient, watchAllClient,
             watchMajorityCommittedClient, watchAllCommittedClient, cluster, LOG);
         LOG.info("{}) {}, {}", i, p, cluster.printServers());
-        testCase.apply(p);
+        testCase.accept(p);
       }
     }
   }
@@ -144,19 +142,45 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
     private final CompletableFuture<RaftClientReply> all;
     private final CompletableFuture<RaftClientReply> majorityCommitted;
     private final CompletableFuture<RaftClientReply> allCommitted;
+    private final Logger log;
 
     WatchReplies(long logIndex,
         CompletableFuture<RaftClientReply> majority, CompletableFuture<RaftClientReply>
all,
-        CompletableFuture<RaftClientReply> majorityCommitted, CompletableFuture<RaftClientReply>
allCommitted) {
+        CompletableFuture<RaftClientReply> majorityCommitted, CompletableFuture<RaftClientReply>
allCommitted, Logger log) {
       this.logIndex = logIndex;
       this.majority = majority;
       this.all = all;
       this.majorityCommitted = majorityCommitted;
       this.allCommitted = allCommitted;
+      this.log = log;
+    }
+
+    RaftClientReply getMajority() throws Exception {
+      final RaftClientReply reply = majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      log.info("watchMajorityReply({}) = {}", logIndex, reply);
+      return reply;
+    }
+
+    RaftClientReply getMajorityCommitted() throws Exception {
+      final RaftClientReply reply = majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      log.info("watchMajorityCommittedReply({}) = {}", logIndex, reply);
+      return reply;
+    }
+
+    RaftClientReply getAll() throws Exception {
+      final RaftClientReply reply = all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      log.info("watchAllReply({}) = {}", logIndex, reply);
+      return reply;
+    }
+
+    RaftClientReply getAllCommitted() throws Exception {
+      final RaftClientReply reply = allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      log.info("watchAllCommittedReply({}) = {}", logIndex, reply);
+      return reply;
     }
   }
 
-  static Void runTestWatchRequestAsync(TestParameters p) throws Exception {
+  static void runTestWatchRequestAsync(TestParameters p) throws Exception {
     final Logger LOG = p.log;
     final MiniRaftCluster cluster = p.cluster;
     final int numMessages = p.numMessages;
@@ -203,7 +227,6 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
     LOG.info("unblock follower {}", blockedFollower.getId());
     SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
     checkAll(watches, LOG);
-    return null;
   }
 
   static void checkMajority(List<CompletableFuture<RaftClientReply>> replies,
@@ -216,13 +239,10 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
 
       final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       Assert.assertEquals(logIndex, watchReplies.logIndex);
-      final RaftClientReply watchMajorityReply = watchReplies.majority.get(GET_TIMEOUT_SECOND,
TimeUnit.SECONDS);
-      LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply);
+      final RaftClientReply watchMajorityReply = watchReplies.getMajority();
       Assert.assertTrue(watchMajorityReply.isSuccess());
 
-      final RaftClientReply watchMajorityCommittedReply
-          = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-      LOG.info("watchMajorityCommittedReply({}) = {}", logIndex, watchMajorityCommittedReply);
+      final RaftClientReply watchMajorityCommittedReply = watchReplies.getMajorityCommitted();
       Assert.assertTrue(watchMajorityCommittedReply.isSuccess());
       { // check commit infos
         final Collection<CommitInfoProto> commitInfos = watchMajorityCommittedReply.getCommitInfos();
@@ -246,12 +266,10 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
       final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       final long logIndex = watchReplies.logIndex;
       LOG.info("checkAll {}: logIndex={}", i, logIndex);
-      final RaftClientReply watchAllReply = watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-      LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply);
+      final RaftClientReply watchAllReply = watchReplies.getAll();
       Assert.assertTrue(watchAllReply.isSuccess());
 
-      final RaftClientReply watchAllCommittedReply = watchReplies.allCommitted.get(GET_TIMEOUT_SECOND,
TimeUnit.SECONDS);
-      LOG.info("watchAllCommittedReply({}) = {}", logIndex, watchAllCommittedReply);
+      final RaftClientReply watchAllCommittedReply = watchReplies.getAllCommitted();
       Assert.assertTrue(watchAllCommittedReply.isSuccess());
       { // check commit infos
         final Collection<CommitInfoProto> commitInfos = watchAllCommittedReply.getCommitInfos();
@@ -284,7 +302,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
         cluster -> runTest(WatchRequestTests::runTestWatchRequestAsyncChangeLeader, cluster,
LOG));
   }
 
-  static Void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws Exception {
+  static void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws Exception {
     final Logger LOG = p.log;
     final MiniRaftCluster cluster = p.cluster;
     final int numMessages = p.numMessages;
@@ -317,6 +335,90 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
     SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
     LOG.info("unblock follower {}", blockedFollower.getId());
     checkAll(watches, LOG);
-    return null;
+  }
+
+  @Test
+  public void testWatchRequestTimeout() throws Exception {
+    final RaftProperties p = getProperties();
+    RaftServerConfigKeys.setWatchTimeout(p, TimeDuration.valueOf(500, TimeUnit.MILLISECONDS));
+    RaftServerConfigKeys.setWatchTimeoutDenomination(p, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS));
+    try {
+      runWithNewCluster(NUM_SERVERS,
+          cluster -> runTest(WatchRequestTests::runTestWatchRequestTimeout, cluster, LOG));
+    } finally {
+      RaftServerConfigKeys.setWatchTimeout(p, RaftServerConfigKeys.WATCH_TIMEOUT_DEFAULT);
+      RaftServerConfigKeys.setWatchTimeoutDenomination(p, RaftServerConfigKeys.WATCH_TIMEOUT_DENOMINATION_DEFAULT);
+    }
+  }
+
+  static void runTestWatchRequestTimeout(TestParameters p) throws Exception {
+    final Logger LOG = p.log;
+    final MiniRaftCluster cluster = p.cluster;
+    final int numMessages = p.numMessages;
+
+    final TimeDuration watchTimeout = RaftServerConfigKeys.watchTimeout(cluster.getProperties());
+    final TimeDuration watchTimeoutDenomination = RaftServerConfigKeys.watchTimeoutDenomination(cluster.getProperties());
+
+    // blockStartTransaction of the leader so that no transaction can be committed MAJORITY
+    final RaftServerImpl leader = cluster.getLeader();
+    LOG.info("block leader {}", leader.getId());
+    SimpleStateMachine4Testing.get(leader).blockStartTransaction();
+
+    // blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED
+    final List<RaftServerImpl> followers = cluster.getFollowers();
+    final RaftServerImpl blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
+    LOG.info("block follower {}", blockedFollower.getId());
+    SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData();
+
+    // send a message
+    final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
+    final List<CompletableFuture<WatchReplies>> watches = new ArrayList<>();
+
+    p.sendRequests(replies, watches);
+
+    Assert.assertEquals(numMessages, replies.size());
+    Assert.assertEquals(numMessages, watches.size());
+
+    watchTimeout.sleep();
+    watchTimeoutDenomination.sleep(); // for roundup error
+    assertNotDone(replies);
+    assertNotDone(watches);
+
+    // unblock leader so that the transaction can be committed.
+    SimpleStateMachine4Testing.get(leader).unblockStartTransaction();
+    LOG.info("unblock leader {}", leader.getId());
+
+    checkMajority(replies, watches, LOG);
+    checkTimeout(replies, watches, LOG);
+
+    SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
+    LOG.info("unblock follower {}", blockedFollower.getId());
+  }
+
+  static void checkTimeout(List<CompletableFuture<RaftClientReply>> replies,
+      List<CompletableFuture<WatchReplies>> watches, Logger LOG) throws Exception
{
+    for(int i = 0; i < replies.size(); i++) {
+      final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      LOG.info("checkTimeout {}: receive {}", i, reply);
+      final long logIndex = reply.getLogIndex();
+      Assert.assertTrue(reply.isSuccess());
+
+      final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      Assert.assertEquals(logIndex, watchReplies.logIndex);
+
+      final RaftClientReply watchAllReply = watchReplies.getAll();
+      assertNotReplicatedException(logIndex, ReplicationLevel.ALL, watchAllReply);
+
+      final RaftClientReply watchAllCommittedReply = watchReplies.getAllCommitted();
+      assertNotReplicatedException(logIndex, ReplicationLevel.ALL_COMMITTED, watchAllCommittedReply);
+    }
+  }
+
+  static void assertNotReplicatedException(long logIndex, ReplicationLevel replication, RaftClientReply
reply) {
+    Assert.assertFalse(reply.isSuccess());
+    final NotReplicatedException nre = reply.getNotReplicatedException();
+    Assert.assertNotNull(nre);
+    Assert.assertEquals(logIndex, nre.getLogIndex());
+    Assert.assertEquals(replication, nre.getRequiredReplication());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
index 06d9301..782d80d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
@@ -19,6 +19,7 @@ package org.apache.ratis.util;
 
 import org.junit.Test;
 
+import java.sql.Time;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -31,7 +32,7 @@ import static org.junit.Assert.assertNotNull;
 
 public class TestTimeDuration {
   @Test(timeout = 1000)
-  public void testTimeDuration() throws Exception {
+  public void testTimeDuration() {
     Arrays.asList(TimeUnit.values())
         .forEach(a -> assertNotNull(Abbreviation.valueOf(a.name())));
     assertEquals(TimeUnit.values().length, Abbreviation.values().length);
@@ -81,4 +82,19 @@ public class TestTimeDuration {
     assertEquals(240, parse("10 day", TimeUnit.HOURS));
     assertEquals(2400, parse("100 days", TimeUnit.HOURS));
   }
+
+  @Test(timeout = 1000)
+  public void testRoundUp() {
+    final long nanosPerSecond = 1_000_000_000L;
+    final TimeDuration oneSecond = TimeDuration.valueOf(1, TimeUnit.SECONDS);
+    assertEquals(-nanosPerSecond, oneSecond.roundUp(-nanosPerSecond - 1));
+    assertEquals(-nanosPerSecond, oneSecond.roundUp(-nanosPerSecond));
+    assertEquals(0, oneSecond.roundUp(-nanosPerSecond + 1));
+    assertEquals(0, oneSecond.roundUp(-1));
+    assertEquals(0, oneSecond.roundUp(0));
+    assertEquals(nanosPerSecond, oneSecond.roundUp(1));
+    assertEquals(nanosPerSecond, oneSecond.roundUp(nanosPerSecond - 1));
+    assertEquals(nanosPerSecond, oneSecond.roundUp(nanosPerSecond));
+    assertEquals(2*nanosPerSecond, oneSecond.roundUp(nanosPerSecond + 1));
+  }
 }



Mime
View raw message