From commits-return-7633-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Thu Jun 27 23:42:17 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id E6136180607 for ; Fri, 28 Jun 2019 01:42:16 +0200 (CEST) Received: (qmail 28879 invoked by uid 500); 27 Jun 2019 23:42:16 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 28869 invoked by uid 99); 27 Jun 2019 23:42:16 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Jun 2019 23:42:16 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D5A3887AD7; Thu, 27 Jun 2019 23:42:10 +0000 (UTC) Date: Thu, 27 Jun 2019 23:42:10 +0000 To: "commits@kudu.apache.org" Subject: [kudu] branch master updated: KUDU-2192: Enable TCP keepalive for all outbound connections MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <156167893080.5817.2289827193652622702@gitbox.apache.org> From: granthenke@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kudu X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: ac8462f55715d7bf40770f6553bf52805a7d451d X-Git-Newrev: 89c02fded7595b4712b465bfb939e4f3035b2e75 X-Git-Rev: 89c02fded7595b4712b465bfb939e4f3035b2e75 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git The following commit(s) were added to refs/heads/master by this push: new 89c02fd KUDU-2192: Enable TCP keepalive for all outbound connections 89c02fd is described below commit 89c02fded7595b4712b465bfb939e4f3035b2e75 Author: Michael Ho AuthorDate: Thu Jun 20 18:34:02 2019 -0700 KUDU-2192: Enable TCP keepalive for all outbound connections This change enables TCP keepalive for all outbound connections. This aims to handle cases in which the remote peer may have dropped off the network without sending a TCP RST. For instance, a remote host could have hit a kernel panic and got power cycled. In which case, the existing TCP connection to that host may be stale. In an idle cluster, this stale connection may not be detected until the next use of it, in which case it will result in a RPC failure due to TCP RST sent from the restarted peer. By enabling TCP keepalive, we ensure that stale TCP connections in an idle cluster will be detected and closed within a time bound so a new connection will be created on the next use. This change introduces 3 different flags: --tcp_keepalive_probe_period_s: the duration in seconds a TCP connection has to be idle before keepalive probes started to be sent. --tcp_keepalive_retry_period_s: the duration in seconds between successive keepalive probes if previous probes didn't get an ACK from remote peer. --tcp_keepalive_retry_count: the maximum number of TCP keepalive probes sent without an ACK before declaring the remote peer as dead. Testing: - Used TCP dump to verify that keepalive probes are being sent periodically. - Verified that blocking all incoming traffic to a server's port via an iptable rule caused the TCP connection to be closed and the keepalive probes to stop eventually. Change-Id: Iaa1d66d83aea1cc82d07fc6217be5fc1306695bc Reviewed-on: http://gerrit.cloudera.org:8080/13702 Reviewed-by: Alexey Serbin Reviewed-by: Todd Lipcon Tested-by: Kudu Jenkins --- src/kudu/rpc/connection.cc | 8 ++++++++ src/kudu/rpc/connection.h | 7 +++++++ src/kudu/rpc/reactor.cc | 33 ++++++++++++++++++++++++++++++++- src/kudu/rpc/reactor.h | 4 ++++ src/kudu/rpc/rpc-test.cc | 37 ++++++++++++++++++++++++++++++++++++- src/kudu/util/net/socket.cc | 18 ++++++++++++++++++ src/kudu/util/net/socket.h | 7 +++++++ 7 files changed, 112 insertions(+), 2 deletions(-) diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc index 7a32a30..2b79464 100644 --- a/src/kudu/rpc/connection.cc +++ b/src/kudu/rpc/connection.cc @@ -217,6 +217,14 @@ Status Connection::SetNonBlocking(bool enabled) { return socket_->SetNonBlocking(enabled); } +Status Connection::SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries) { + DCHECK_GT(idle_time_s, 0); + DCHECK_GE(retry_time_s, 0); + DCHECK_GE(num_retries, 0); + return socket_->SetTcpKeepAlive(std::max(1, idle_time_s), std::max(0, retry_time_s), + std::max(0, num_retries)); +} + void Connection::EpollRegister(ev::loop_ref& loop) { DCHECK(reactor_thread_->IsCurrentThread()); DVLOG(4) << "Registering connection for epoll: " << ToString(); diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h index 9fb6a6c..8f80b5a 100644 --- a/src/kudu/rpc/connection.h +++ b/src/kudu/rpc/connection.h @@ -99,6 +99,13 @@ class Connection : public RefCountedThreadSafe { // Set underlying socket to non-blocking (or blocking) mode. Status SetNonBlocking(bool enabled); + // Enable TCP keepalive for the underlying socket. A TCP keepalive probe will be sent + // to the remote end after the connection has been idle for 'idle_time_s' seconds. + // It will retry sending probes up to 'num_retries' number of times until an ACK is + // heard from peer. 'retry_time_s' is the sleep time in seconds between successive + // keepalive probes. + Status SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries); + // Register our socket with an epoll loop. We will only ever be registered in // one epoll loop at a time. void EpollRegister(ev::loop_ref& loop); diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc index b9308b5..a3e56f7 100644 --- a/src/kudu/rpc/reactor.cc +++ b/src/kudu/rpc/reactor.cc @@ -49,6 +49,7 @@ #include "kudu/util/monotime.h" #include "kudu/util/net/sockaddr.h" #include "kudu/util/net/socket.h" +#include "kudu/util/random_util.h" #include "kudu/util/status.h" #include "kudu/util/thread.h" #include "kudu/util/thread_restrictions.h" @@ -80,6 +81,21 @@ DEFINE_bool(rpc_reopen_outbound_connections, false, TAG_FLAG(rpc_reopen_outbound_connections, unsafe); TAG_FLAG(rpc_reopen_outbound_connections, runtime); +DEFINE_int32(tcp_keepalive_probe_period_s, 60, + "The duration in seconds after an outbound connection has gone idle " + "before a TCP keepalive probe is sent to the peer. Set to 0 to disable " + "TCP keepalive probes from being sent."); +DEFINE_int32(tcp_keepalive_retry_period_s, 3, + "The duration in seconds between successive keepalive probes from an " + "outbound connection if the previous probes are not acknowledged. " + "Effective only if --tcp_keepalive_probe_period_s is not 0."); +DEFINE_int32(tcp_keepalive_retry_count, 10, + "The maximum number of keepalive probes sent before declaring the remote " + "end as dead. Effective only if --tcp_keepalive_probe_period_s is not 0."); +TAG_FLAG(tcp_keepalive_probe_period_s, advanced); +TAG_FLAG(tcp_keepalive_retry_period_s, advanced); +TAG_FLAG(tcp_keepalive_retry_count, advanced); + METRIC_DEFINE_histogram(server, reactor_load_percent, "Reactor Thread Load Percentage", kudu::MetricUnit::kUnits, @@ -133,7 +149,8 @@ ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder& bld) connection_keepalive_time_(bld.connection_keepalive_time_), coarse_timer_granularity_(bld.coarse_timer_granularity_), total_client_conns_cnt_(0), - total_server_conns_cnt_(0) { + total_server_conns_cnt_(0), + rng_(GetRandomSeed32()) { if (bld.metric_entity_) { invoke_us_histogram_ = @@ -608,6 +625,20 @@ void ReactorThread::CompleteConnectionNegotiation( return; } + if (FLAGS_tcp_keepalive_probe_period_s > 0) { + // Try spreading out the idle poll period to avoid thundering herd in case connections + // are all created at the same time (e.g. after a cluster is restarted). + Status keepalive_status = conn->SetTcpKeepAlive( + FLAGS_tcp_keepalive_probe_period_s + rng_.Uniform32(4), + FLAGS_tcp_keepalive_retry_period_s, FLAGS_tcp_keepalive_retry_count); + if (PREDICT_FALSE(!keepalive_status.ok())) { + LOG(DFATAL) << "Unable to set TCP keepalive for connection: " + << keepalive_status.ToString(); + DestroyConnection(conn.get(), keepalive_status, std::move(rpc_error)); + return; + } + } + conn->MarkNegotiationComplete(); conn->EpollRegister(loop_); } diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h index a74a3bb..4ab7ef6 100644 --- a/src/kudu/rpc/reactor.h +++ b/src/kudu/rpc/reactor.h @@ -37,6 +37,7 @@ #include "kudu/util/locks.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" +#include "kudu/util/random.h" #include "kudu/util/status.h" #include "kudu/util/thread.h" @@ -327,6 +328,9 @@ class ReactorThread { // started. int64_t total_poll_cycles_ = 0; + // Random number generator for randomizing the TCP keepalive interval. + Random rng_; + // Accounting for determining load average in each cycle of TimerHandler. struct { // The cycle-time at which the load average was last calculated. diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc index 0d70bbf..88fe7fa 100644 --- a/src/kudu/rpc/rpc-test.cc +++ b/src/kudu/rpc/rpc-test.cc @@ -72,6 +72,9 @@ METRIC_DECLARE_histogram(rpc_incoming_queue_time); DECLARE_bool(rpc_reopen_outbound_connections); DECLARE_int32(rpc_negotiation_inject_delay_ms); +DECLARE_int32(tcp_keepalive_probe_period_s); +DECLARE_int32(tcp_keepalive_retry_period_s); +DECLARE_int32(tcp_keepalive_retry_count); using std::shared_ptr; using std::string; @@ -781,6 +784,34 @@ TEST_P(TestRpc, TestCallLongerThanKeepalive) { req, &resp, &controller)); } +// Test a call which leaves the TCP connection idle for extended period of time +// and verifies that the call succeeds (i.e. the connection is not closed). +TEST_P(TestRpc, TestTCPKeepalive) { + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + ASSERT_OK(StartTestServer(&server_addr, enable_ssl)); + + // Set up client. + FLAGS_tcp_keepalive_probe_period_s = 1; + FLAGS_tcp_keepalive_retry_period_s = 1; + FLAGS_tcp_keepalive_retry_count = 1; + shared_ptr client_messenger; + ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl)); + Proxy p(client_messenger, server_addr, server_addr.host(), + GenericCalculatorService::static_service_name()); + + // Make a call which sleeps for longer than TCP keepalive probe period, + // triggering TCP keepalive probes. + RpcController controller; + SleepRequestPB req; + req.set_sleep_micros(8 * 1000 * 1000); // 8 seconds. + req.set_deferred(true); + SleepResponsePB resp; + ASSERT_OK(p.SyncRequest(GenericCalculatorService::kSleepMethodName, + req, &resp, &controller)); +} + // Test that the RpcSidecar transfers the expected messages. TEST_P(TestRpc, TestRpcSidecar) { // Set up server. @@ -1067,11 +1098,15 @@ TEST_F(TestRpc, TestServerShutsDown) { // EINVAL is possible if the controller socket had already disconnected by // the time it trys to set the SO_SNDTIMEO socket option as part of the // normal blocking SASL handshake. + // + // ENOTCONN is possible simply because the server closes the connection + // after the connection is established. ASSERT_TRUE(s.posix_code() == EPIPE || s.posix_code() == ECONNRESET || s.posix_code() == ESHUTDOWN || s.posix_code() == ECONNREFUSED || - s.posix_code() == EINVAL) + s.posix_code() == EINVAL || + s.posix_code() == ENOTCONN) << "Unexpected status: " << s.ToString(); } } diff --git a/src/kudu/util/net/socket.cc b/src/kudu/util/net/socket.cc index 2ffb5cf..018b16c 100644 --- a/src/kudu/util/net/socket.cc +++ b/src/kudu/util/net/socket.cc @@ -585,6 +585,24 @@ Status Socket::SetTimeout(int opt, const char* optname, const MonoDelta& timeout return Status::OK(); } +Status Socket::SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries) { +#if defined(__linux__) + static const char* const err_string = "failed to set socket option $0 to $1"; + DCHECK_GT(idle_time_s, 0); + RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_KEEPIDLE, idle_time_s), + Substitute(err_string, "TCP_KEEPIDLE", idle_time_s)); + DCHECK_GT(retry_time_s, 0); + RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_KEEPINTVL, retry_time_s), + Substitute(err_string, "TCP_KEEPINTVL", retry_time_s)); + DCHECK_GT(num_retries, 0); + RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_KEEPCNT, num_retries), + Substitute(err_string, "TCP_KEEPCNT", num_retries)); + RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_KEEPALIVE, 1), + "failed to enable TCP KeepAlive socket option"); +#endif + return Status::OK(); +} + template Status Socket::SetSockOpt(int level, int option, const T& value) { if (::setsockopt(fd_, level, option, &value, sizeof(T)) == -1) { diff --git a/src/kudu/util/net/socket.h b/src/kudu/util/net/socket.h index 992c44a..0f561a7 100644 --- a/src/kudu/util/net/socket.h +++ b/src/kudu/util/net/socket.h @@ -153,6 +153,13 @@ class Socket { // See also readn() from Stevens (2004) or Kerrisk (2010) Status BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime& deadline); + // Enable TCP keepalive for the underlying socket. A TCP keepalive probe will be sent + // to the remote end after the connection has been idle for 'idle_time_s' seconds. + // It will retry sending probes up to 'num_retries' number of times until an ACK is + // heard from peer. 'retry_time_s' is the sleep time in seconds between successive + // keepalive probes. + Status SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries); + private: // Called internally from SetSend/RecvTimeout(). Status SetTimeout(int opt, const char* optname, const MonoDelta& timeout);