kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject [3/4] kudu git commit: KUDU-2237: Allow idle server connection scanning to be disabled
Date Thu, 21 Dec 2017 05:58:11 GMT
KUDU-2237: Allow idle server connection scanning to be disabled

Currently, a server connection being idle for more than
FLAGS_rpc_default_keepalive_time_ms ms will be closed.
However, certain services (e.g. Impala) using KRPC may want to
keep the idle connections alive for various reasons (e.g. sheer
number of connections to re-establish,  negotiation overhead
in a secure cluster). To avoid idle connection from being
closed, one currently have to set FLAGS_rpc_default_keepalive_time_ms
to a very large value.

This change implements a cleaner solution by disabling idle
connection scanning if FLAGS_rpc_default_keepalive_time_ms is
set to any negative value. This avoids the unnecessary
overhead of scanning for idle server connections and alleviates
the user from having to pick a random large number to make sure
the connection is always kept alive.

Change-Id: I6161b9e753f05620784565a417d248acf8e7050a
Reviewed-on: http://gerrit.cloudera.org:8080/8831
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/948ac0eb
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/948ac0eb
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/948ac0eb

Branch: refs/heads/master
Commit: 948ac0eb45b83cbba476cf722dd5c043ddac5de3
Parents: f2184c1
Author: Michael Ho <kwho@cloudera.com>
Authored: Wed Dec 13 14:30:25 2017 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Thu Dec 21 04:48:14 2017 +0000

----------------------------------------------------------------------
 src/kudu/rpc/messenger.h       |  1 +
 src/kudu/rpc/reactor.cc        | 40 +++++++++++++++++++-----------------
 src/kudu/rpc/reactor.h         |  3 ++-
 src/kudu/rpc/rpc-test-base.h   | 15 +++++++-------
 src/kudu/rpc/rpc-test.cc       | 41 +++++++++++++++++++++++++++++++++++++
 src/kudu/server/server_base.cc |  3 ++-
 6 files changed, 75 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/948ac0eb/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 6a0581b..e030a5c 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -322,6 +322,7 @@ class Messenger {
 
  private:
   FRIEND_TEST(TestRpc, TestConnectionKeepalive);
+  FRIEND_TEST(TestRpc, TestConnectionAlwaysKeepalive);
   FRIEND_TEST(TestRpc, TestCredentialsPolicy);
   FRIEND_TEST(TestRpc, TestReopenOutboundConnections);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/948ac0eb/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 7f13cb8..3898196 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -403,28 +403,30 @@ void ReactorThread::ScanIdleConnections() {
   // Enforce TCP connection timeouts: server-side connections.
   const auto server_conns_end = server_conns_.end();
   uint64_t timed_out = 0;
-  for (auto it = server_conns_.begin(); it != server_conns_end; ) {
-    Connection* conn = it->get();
-    if (!conn->Idle()) {
-      VLOG(10) << "Connection " << conn->ToString() << " not idle";
-      ++it;
-      continue;
-    }
+  // Scan for idle server connections if it's enabled.
+  if (connection_keepalive_time_ >= MonoDelta::FromMilliseconds(0)) {
+    for (auto it = server_conns_.begin(); it != server_conns_end; ) {
+      Connection* conn = it->get();
+      if (!conn->Idle()) {
+        VLOG(10) << "Connection " << conn->ToString() << " not idle";
+        ++it;
+        continue;
+      }
 
-    const MonoDelta connection_delta(cur_time_ - conn->last_activity_time());
-    if (connection_delta <= connection_keepalive_time_) {
-      ++it;
-      continue;
-    }
+      const MonoDelta connection_delta(cur_time_ - conn->last_activity_time());
+      if (connection_delta <= connection_keepalive_time_) {
+        ++it;
+        continue;
+      }
 
-    conn->Shutdown(Status::NetworkError(
-        Substitute("connection timed out after $0", connection_keepalive_time_.ToString())));
-    VLOG(1) << "Timing out connection " << conn->ToString() << " - it
has been idle for "
-            << connection_delta.ToString();
-    ++timed_out;
-    it = server_conns_.erase(it);
+      conn->Shutdown(Status::NetworkError(
+          Substitute("connection timed out after $0", connection_keepalive_time_.ToString())));
+      VLOG(1) << "Timing out connection " << conn->ToString() << " -
it has been idle for "
+              << connection_delta.ToString();
+      ++timed_out;
+      it = server_conns_.erase(it);
+    }
   }
-
   // Take care of idle client-side connections marked for shutdown.
   uint64_t shutdown = 0;
   for (auto it = client_conns_.begin(); it != client_conns_.end();) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/948ac0eb/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index 288e38e..370c4b7 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -244,7 +244,8 @@ class ReactorThread {
                          std::unique_ptr<ErrorStatusPB> rpc_error = {});
 
   // Scan any open connections for idle ones that have been idle longer than
-  // connection_keepalive_time_
+  // connection_keepalive_time_. If connection_keepalive_time_ < 0, the scan
+  // is skipped.
   void ScanIdleConnections();
 
   // Create a new client socket (non-blocking, NODELAY)

http://git-wip-us.apache.org/repos/asf/kudu/blob/948ac0eb/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 53ba71d..cc2480e 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -445,13 +445,14 @@ class RpcTestBase : public KuduTest {
     }
 
     bld.set_num_reactors(n_reactors);
-    bld.set_connection_keepalive_time(
-      MonoDelta::FromMilliseconds(keepalive_time_ms_));
-    // In order for the keepalive timing to be accurate, we need to scan connections
-    // significantly more frequently than the keepalive time. This "coarse timer"
-    // granularity determines this.
-    bld.set_coarse_timer_granularity(MonoDelta::FromMilliseconds(
-                                       std::min(keepalive_time_ms_ / 5, 100)));
+    bld.set_connection_keepalive_time(MonoDelta::FromMilliseconds(keepalive_time_ms_));
+    if (keepalive_time_ms_ >= 0) {
+      // In order for the keepalive timing to be accurate, we need to scan connections
+      // significantly more frequently than the keepalive time. This "coarse timer"
+      // granularity determines this.
+      bld.set_coarse_timer_granularity(
+          MonoDelta::FromMilliseconds(std::min(keepalive_time_ms_ / 5, 100)));
+    }
     bld.set_metric_entity(metric_entity_);
     std::shared_ptr<Messenger> messenger;
     CHECK_OK(bld.Build(&messenger));

http://git-wip-us.apache.org/repos/asf/kudu/blob/948ac0eb/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index d3360f8..26d0a7b 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -403,6 +403,47 @@ TEST_P(TestRpc, TestConnectionKeepalive) {
   ASSERT_EQ(0, metrics.num_client_connections_) << "Client should have 0 client connections";
 }
 
+// Test that idle connection is kept alive when 'keepalive_time_ms_' is set to -1.
+TEST_P(TestRpc, TestConnectionAlwaysKeepalive) {
+  // Only run one reactor per messenger, so we can grab the metrics from that
+  // one without having to check all.
+  n_server_reactor_threads_ = 1;
+  keepalive_time_ms_ = -1;
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
+
+  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+
+  ReactorMetrics metrics;
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(1, metrics.num_server_connections_) << "Server should have 1 server connection";
+  ASSERT_EQ(0, metrics.num_client_connections_) << "Server should have 0 client connections";
+
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.num_server_connections_) << "Client should have 0 server connections";
+  ASSERT_EQ(1, metrics.num_client_connections_) << "Client should have 1 client connections";
+
+  SleepFor(MonoDelta::FromSeconds(3));
+
+  // After sleeping, the connection should still be alive.
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(1, metrics.num_server_connections_) << "Server should have 1 server connections";
+  ASSERT_EQ(0, metrics.num_client_connections_) << "Server should have 0 client connections";
+
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.num_server_connections_) << "Client should have 0 server connections";
+  ASSERT_EQ(1, metrics.num_client_connections_) << "Client should have 1 client connections";
+}
+
 // Test that outbound connections to the same server are reopen upon every RPC
 // call when the 'rpc_reopen_outbound_connections' flag is set.
 TEST_P(TestRpc, TestReopenOutboundConnections) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/948ac0eb/src/kudu/server/server_base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 269c098..90e60ed 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -199,7 +199,8 @@ TAG_FLAG(rpc_ca_certificate_file, experimental);
 
 DEFINE_int32(rpc_default_keepalive_time_ms, 65000,
              "If an RPC connection from a client is idle for this amount of time, the server
"
-             "will disconnect the client.");
+             "will disconnect the client. Setting this to any negative value keeps connections
"
+             "always alive.");
 TAG_FLAG(rpc_default_keepalive_time_ms, advanced);
 
 DECLARE_bool(use_hybrid_clock);


Mime
View raw message