kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [kudu] branch master updated: KUDU-2192: Enable TCP keepalive for all outbound connections
Date Thu, 27 Jun 2019 23:42:10 GMT
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 89c02fd  KUDU-2192: Enable TCP keepalive for all outbound connections
89c02fd is described below

commit 89c02fded7595b4712b465bfb939e4f3035b2e75
Author: Michael Ho <kwho@cloudera.com>
AuthorDate: Thu Jun 20 18:34:02 2019 -0700

    KUDU-2192: Enable TCP keepalive for all outbound connections
    
    This change enables TCP keepalive for all outbound connections.
    This aims to handle cases in which the remote peer may have
    dropped off the network without sending a TCP RST. For instance,
    a remote host could have hit a kernel panic and got power cycled.
    In which case, the existing TCP connection to that host may be
    stale. In an idle cluster, this stale connection may not be detected
    until the next use of it, in which case it will result in a RPC
    failure due to TCP RST sent from the restarted peer.
    
    By enabling TCP keepalive, we ensure that stale TCP connections
    in an idle cluster will be detected and closed within a time bound
    so a new connection will be created on the next use. This change
    introduces 3 different flags:
    
    --tcp_keepalive_probe_period_s: the duration in seconds a TCP connection
    has to be idle before keepalive probes started to be sent.
    
    --tcp_keepalive_retry_period_s: the duration in seconds between successive
    keepalive probes if previous probes didn't get an ACK from remote peer.
    
    --tcp_keepalive_retry_count: the maximum number of TCP keepalive probes
    sent without an ACK before declaring the remote peer as dead.
    
    Testing:
    - Used TCP dump to verify that keepalive probes are being sent periodically.
    - Verified that blocking all incoming traffic to a server's port via an iptable
    rule caused the TCP connection to be closed and the keepalive probes to stop
    eventually.
    
    Change-Id: Iaa1d66d83aea1cc82d07fc6217be5fc1306695bc
    Reviewed-on: http://gerrit.cloudera.org:8080/13702
    Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
    Reviewed-by: Todd Lipcon <todd@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/rpc/connection.cc  |  8 ++++++++
 src/kudu/rpc/connection.h   |  7 +++++++
 src/kudu/rpc/reactor.cc     | 33 ++++++++++++++++++++++++++++++++-
 src/kudu/rpc/reactor.h      |  4 ++++
 src/kudu/rpc/rpc-test.cc    | 37 ++++++++++++++++++++++++++++++++++++-
 src/kudu/util/net/socket.cc | 18 ++++++++++++++++++
 src/kudu/util/net/socket.h  |  7 +++++++
 7 files changed, 112 insertions(+), 2 deletions(-)

diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index 7a32a30..2b79464 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -217,6 +217,14 @@ Status Connection::SetNonBlocking(bool enabled) {
   return socket_->SetNonBlocking(enabled);
 }
 
+Status Connection::SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries) {
+  DCHECK_GT(idle_time_s, 0);
+  DCHECK_GE(retry_time_s, 0);
+  DCHECK_GE(num_retries, 0);
+  return socket_->SetTcpKeepAlive(std::max(1, idle_time_s), std::max(0, retry_time_s),
+      std::max(0, num_retries));
+}
+
 void Connection::EpollRegister(ev::loop_ref& loop) {
   DCHECK(reactor_thread_->IsCurrentThread());
   DVLOG(4) << "Registering connection for epoll: " << ToString();
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index 9fb6a6c..8f80b5a 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -99,6 +99,13 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // Set underlying socket to non-blocking (or blocking) mode.
   Status SetNonBlocking(bool enabled);
 
+  // Enable TCP keepalive for the underlying socket. A TCP keepalive probe will be sent
+  // to the remote end after the connection has been idle for 'idle_time_s' seconds.
+  // It will retry sending probes up to 'num_retries' number of times until an ACK is
+  // heard from peer. 'retry_time_s' is the sleep time in seconds between successive
+  // keepalive probes.
+  Status SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries);
+
   // Register our socket with an epoll loop.  We will only ever be registered in
   // one epoll loop at a time.
   void EpollRegister(ev::loop_ref& loop);
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index b9308b5..a3e56f7 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -49,6 +49,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 #include "kudu/util/thread_restrictions.h"
@@ -80,6 +81,21 @@ DEFINE_bool(rpc_reopen_outbound_connections, false,
 TAG_FLAG(rpc_reopen_outbound_connections, unsafe);
 TAG_FLAG(rpc_reopen_outbound_connections, runtime);
 
+DEFINE_int32(tcp_keepalive_probe_period_s, 60,
+             "The duration in seconds after an outbound connection has gone idle "
+             "before a TCP keepalive probe is sent to the peer. Set to 0 to disable "
+             "TCP keepalive probes from being sent.");
+DEFINE_int32(tcp_keepalive_retry_period_s, 3,
+             "The duration in seconds between successive keepalive probes from an "
+             "outbound connection if the previous probes are not acknowledged. "
+             "Effective only if --tcp_keepalive_probe_period_s is not 0.");
+DEFINE_int32(tcp_keepalive_retry_count, 10,
+             "The maximum number of keepalive probes sent before declaring the remote "
+             "end as dead. Effective only if --tcp_keepalive_probe_period_s is not 0.");
+TAG_FLAG(tcp_keepalive_probe_period_s, advanced);
+TAG_FLAG(tcp_keepalive_retry_period_s, advanced);
+TAG_FLAG(tcp_keepalive_retry_count, advanced);
+
 METRIC_DEFINE_histogram(server, reactor_load_percent,
                         "Reactor Thread Load Percentage",
                         kudu::MetricUnit::kUnits,
@@ -133,7 +149,8 @@ ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder&
bld)
     connection_keepalive_time_(bld.connection_keepalive_time_),
     coarse_timer_granularity_(bld.coarse_timer_granularity_),
     total_client_conns_cnt_(0),
-    total_server_conns_cnt_(0) {
+    total_server_conns_cnt_(0),
+    rng_(GetRandomSeed32()) {
 
   if (bld.metric_entity_) {
     invoke_us_histogram_ =
@@ -608,6 +625,20 @@ void ReactorThread::CompleteConnectionNegotiation(
     return;
   }
 
+  if (FLAGS_tcp_keepalive_probe_period_s > 0) {
+    // Try spreading out the idle poll period to avoid thundering herd in case connections
+    // are all created at the same time (e.g. after a cluster is restarted).
+    Status keepalive_status = conn->SetTcpKeepAlive(
+        FLAGS_tcp_keepalive_probe_period_s + rng_.Uniform32(4),
+        FLAGS_tcp_keepalive_retry_period_s, FLAGS_tcp_keepalive_retry_count);
+    if (PREDICT_FALSE(!keepalive_status.ok())) {
+      LOG(DFATAL) << "Unable to set TCP keepalive for connection: "
+                  << keepalive_status.ToString();
+      DestroyConnection(conn.get(), keepalive_status, std::move(rpc_error));
+      return;
+    }
+  }
+
   conn->MarkNegotiationComplete();
   conn->EpollRegister(loop_);
 }
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index a74a3bb..4ab7ef6 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -37,6 +37,7 @@
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 
@@ -327,6 +328,9 @@ class ReactorThread {
   // started.
   int64_t total_poll_cycles_ = 0;
 
+  // Random number generator for randomizing the TCP keepalive interval.
+  Random rng_;
+
   // Accounting for determining load average in each cycle of TimerHandler.
   struct {
     // The cycle-time at which the load average was last calculated.
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 0d70bbf..88fe7fa 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -72,6 +72,9 @@ METRIC_DECLARE_histogram(rpc_incoming_queue_time);
 
 DECLARE_bool(rpc_reopen_outbound_connections);
 DECLARE_int32(rpc_negotiation_inject_delay_ms);
+DECLARE_int32(tcp_keepalive_probe_period_s);
+DECLARE_int32(tcp_keepalive_retry_period_s);
+DECLARE_int32(tcp_keepalive_retry_count);
 
 using std::shared_ptr;
 using std::string;
@@ -781,6 +784,34 @@ TEST_P(TestRpc, TestCallLongerThanKeepalive) {
                                  req, &resp, &controller));
 }
 
+// Test a call which leaves the TCP connection idle for extended period of time
+// and verifies that the call succeeds (i.e. the connection is not closed).
+TEST_P(TestRpc, TestTCPKeepalive) {
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+
+  // Set up client.
+  FLAGS_tcp_keepalive_probe_period_s = 1;
+  FLAGS_tcp_keepalive_retry_period_s = 1;
+  FLAGS_tcp_keepalive_retry_count = 1;
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+      GenericCalculatorService::static_service_name());
+
+  // Make a call which sleeps for longer than TCP keepalive probe period,
+  // triggering TCP keepalive probes.
+  RpcController controller;
+  SleepRequestPB req;
+  req.set_sleep_micros(8 * 1000 * 1000); // 8 seconds.
+  req.set_deferred(true);
+  SleepResponsePB resp;
+  ASSERT_OK(p.SyncRequest(GenericCalculatorService::kSleepMethodName,
+      req, &resp, &controller));
+}
+
 // Test that the RpcSidecar transfers the expected messages.
 TEST_P(TestRpc, TestRpcSidecar) {
   // Set up server.
@@ -1067,11 +1098,15 @@ TEST_F(TestRpc, TestServerShutsDown) {
     // EINVAL is possible if the controller socket had already disconnected by
     // the time it trys to set the SO_SNDTIMEO socket option as part of the
     // normal blocking SASL handshake.
+    //
+    // ENOTCONN is possible simply because the server closes the connection
+    // after the connection is established.
     ASSERT_TRUE(s.posix_code() == EPIPE ||
                 s.posix_code() == ECONNRESET ||
                 s.posix_code() == ESHUTDOWN ||
                 s.posix_code() == ECONNREFUSED ||
-                s.posix_code() == EINVAL)
+                s.posix_code() == EINVAL ||
+                s.posix_code() == ENOTCONN)
       << "Unexpected status: " << s.ToString();
   }
 }
diff --git a/src/kudu/util/net/socket.cc b/src/kudu/util/net/socket.cc
index 2ffb5cf..018b16c 100644
--- a/src/kudu/util/net/socket.cc
+++ b/src/kudu/util/net/socket.cc
@@ -585,6 +585,24 @@ Status Socket::SetTimeout(int opt, const char* optname, const MonoDelta&
timeout
   return Status::OK();
 }
 
+Status Socket::SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries) {
+#if defined(__linux__)
+  static const char* const err_string = "failed to set socket option $0 to $1";
+  DCHECK_GT(idle_time_s, 0);
+  RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_KEEPIDLE, idle_time_s),
+      Substitute(err_string, "TCP_KEEPIDLE", idle_time_s));
+  DCHECK_GT(retry_time_s, 0);
+  RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_KEEPINTVL, retry_time_s),
+      Substitute(err_string, "TCP_KEEPINTVL", retry_time_s));
+  DCHECK_GT(num_retries, 0);
+  RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_KEEPCNT, num_retries),
+      Substitute(err_string, "TCP_KEEPCNT", num_retries));
+  RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_KEEPALIVE, 1),
+      "failed to enable TCP KeepAlive socket option");
+#endif
+  return Status::OK();
+}
+
 template<typename T>
 Status Socket::SetSockOpt(int level, int option, const T& value) {
   if (::setsockopt(fd_, level, option, &value, sizeof(T)) == -1) {
diff --git a/src/kudu/util/net/socket.h b/src/kudu/util/net/socket.h
index 992c44a..0f561a7 100644
--- a/src/kudu/util/net/socket.h
+++ b/src/kudu/util/net/socket.h
@@ -153,6 +153,13 @@ class Socket {
   // See also readn() from Stevens (2004) or Kerrisk (2010)
   Status BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime& deadline);
 
+  // Enable TCP keepalive for the underlying socket. A TCP keepalive probe will be sent
+  // to the remote end after the connection has been idle for 'idle_time_s' seconds.
+  // It will retry sending probes up to 'num_retries' number of times until an ACK is
+  // heard from peer. 'retry_time_s' is the sleep time in seconds between successive
+  // keepalive probes.
+  Status SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries);
+
  private:
   // Called internally from SetSend/RecvTimeout().
   Status SetTimeout(int opt, const char* optname, const MonoDelta& timeout);


Mime
View raw message