kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] 05/05: rpc: add TCP socket statistics to /rpcz
Date Mon, 28 Jan 2019 23:58:25 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 0f6d33b4a29873197952335a5777ccf9163fc307
Author: Todd Lipcon <todd@apache.org>
AuthorDate: Tue Jan 8 15:01:29 2019 -0800

    rpc: add TCP socket statistics to /rpcz
    
    This adds the ability to fetch various bits of socket-level information
    for each RPC connection and publish the info into /rpcz. The information
    itself is fetched using getsockopt(TCP_INFO) as well as ioctls to check
    the current send and receive queue lengths.
    
    This data can help resolve whether a use case is network bound or bound
    by the application itself. For example, a high number of retransmitted
    packets can indicate that the network path to the receiver is
    overloaded.
    
    Eventually we may want to expose some of this information on a per-call
    basis. However, doing so is quite tricky, since 'send()' completes when
    the data has been placed into the outbound packet queue and doesn't wait
    until the data is ACKed. We'd need to defer checking for retransmissions
    until all of the data has been ACKed, which is at some indeterminate
    point in the future. The very newest kernels allow subscribing to such
    notifications (along with lots of interesting stats) but, given none of
    that is available in el7, it's probably not worth tackling at this
    point.
    
    Change-Id: I552c9dd80c0730ccd6bf7b13bb63761744a854c2
    Reviewed-on: http://gerrit.cloudera.org:8080/12184
    Reviewed-by: Will Berkeley <wdberkeley@gmail.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/rpc/connection.cc           | 226 ++++++++++++++++++++++++++++++++++-
 src/kudu/rpc/connection.h            |   7 +-
 src/kudu/rpc/inbound_call.cc         |   2 +-
 src/kudu/rpc/inbound_call.h          |   4 +-
 src/kudu/rpc/messenger.cc            |   6 +-
 src/kudu/rpc/messenger.h             |  10 +-
 src/kudu/rpc/outbound_call.cc        |   2 +-
 src/kudu/rpc/outbound_call.h         |   4 +-
 src/kudu/rpc/reactor.cc              |  12 +-
 src/kudu/rpc/reactor.h               |  12 +-
 src/kudu/rpc/rpc-test.cc             |  21 +++-
 src/kudu/rpc/rpc_introspection.proto |  34 +++++-
 src/kudu/rpc/rpc_stub-test.cc        |   8 +-
 src/kudu/server/rpcz-path-handler.cc |  10 +-
 14 files changed, 313 insertions(+), 45 deletions(-)

diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index 1632dd3..7a32a30 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -17,6 +17,9 @@
 
 #include "kudu/rpc/connection.h"
 
+#include <netinet/in.h>
+#include <string.h>
+
 #include <algorithm>
 #include <cerrno>
 #include <iostream>
@@ -31,7 +34,6 @@
 #include <glog/logging.h>
 
 #include "kudu/gutil/map-util.h"
-#include "kudu/util/slice.h"
 #include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/inbound_call.h"
@@ -44,8 +46,15 @@
 #include "kudu/rpc/transfer.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
+#ifdef __linux__
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <linux/tcp.h>
+#endif
+
 using std::includes;
 using std::set;
 using std::shared_ptr;
@@ -57,6 +66,132 @@ namespace rpc {
 
 typedef OutboundCall::Phase Phase;
 
+namespace {
+
+// tcp_info struct duplicated from linux/tcp.h.
+//
+// This allows us to decouple the compile-time Linux headers from the
+// runtime Linux kernel. The compile-time headers (and kernel) might be
+// older than the runtime kernel, in which case an ifdef-based approach
+// wouldn't allow us to get all of the info available.
+//
+// NOTE: this struct has been annotated with some local notes about the
+// contents of each field.
+struct tcp_info {
+  // Various state-tracking information.
+  // ------------------------------------------------------------
+  uint8_t    tcpi_state;
+  uint8_t    tcpi_ca_state;
+  uint8_t    tcpi_retransmits;
+  uint8_t    tcpi_probes;
+  uint8_t    tcpi_backoff;
+  uint8_t    tcpi_options;
+  uint8_t    tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4;
+  uint8_t    tcpi_delivery_rate_app_limited:1;
+
+  // Configurations.
+  // ------------------------------------------------------------
+  uint32_t   tcpi_rto;
+  uint32_t   tcpi_ato;
+  uint32_t   tcpi_snd_mss;
+  uint32_t   tcpi_rcv_mss;
+
+  // Counts of packets in various states in the outbound queue.
+  // At first glance one might think these are monotonic counters, but
+  // in fact they are instantaneous counts of queued packets and thus
+  // not very useful for our purposes.
+  // ------------------------------------------------------------
+  // Number of packets outstanding that haven't been acked.
+  uint32_t   tcpi_unacked;
+
+  // Number of packets outstanding that have been selective-acked.
+  uint32_t   tcpi_sacked;
+
+  // Number of packets outstanding that have been deemed lost (a SACK arrived
+  // for a later packet)
+  uint32_t   tcpi_lost;
+
+  // Number of packets in the queue that have been retransmitted.
+  uint32_t   tcpi_retrans;
+
+  // The number of packets towards the highest SACKed sequence number
+  // (some measure of reording, removed in later Linux versions by
+  // 737ff314563ca27f044f9a3a041e9d42491ef7ce)
+  uint32_t   tcpi_fackets;
+
+  // Times when various events occurred.
+  // ------------------------------------------------------------
+  uint32_t   tcpi_last_data_sent;
+  uint32_t   tcpi_last_ack_sent;     /* Not remembered, sorry. */
+  uint32_t   tcpi_last_data_recv;
+  uint32_t   tcpi_last_ack_recv;
+
+  // Path MTU.
+  uint32_t   tcpi_pmtu;
+
+  // Receiver slow start threshold.
+  uint32_t   tcpi_rcv_ssthresh;
+
+  // Smoothed RTT estimate and variance based on the time between sending data and receiving
+  // corresponding ACK. See https://tools.ietf.org/html/rfc2988 for details.
+  uint32_t   tcpi_rtt;
+  uint32_t   tcpi_rttvar;
+
+  // Slow start threshold.
+  uint32_t   tcpi_snd_ssthresh;
+  // Sender congestion window (in number of MSS-sized packets)
+  uint32_t   tcpi_snd_cwnd;
+  // Advertised MSS.
+  uint32_t   tcpi_advmss;
+  // Amount of packet reordering allowed.
+  uint32_t   tcpi_reordering;
+
+  // Receiver-side RTT estimate per the Dynamic Right Sizing algorithm:
+  //
+  // "A system that is only transmitting acknowledgements can still estimate the round-trip
+  // time by observing the time between when a byte is first acknowledged and the receipt
of
+  // data that is at least one window beyond the sequence number that was acknowledged. If
the
+  // sender is being throttled by the network, this estimate will be valid. However, if the
+  // sending application did not have any data to send, the measured time could be much larger
+  // than the actual round-trip time. Thus this measurement acts only as an upper-bound on
the
+  // round-trip time and should be be used only when it is the only source of round-trip
time
+  // information."
+  uint32_t   tcpi_rcv_rtt;
+  uint32_t   tcpi_rcv_space;
+
+  // Total number of retransmitted packets.
+  uint32_t   tcpi_total_retrans;
+
+  // Pacing-related metrics.
+  uint64_t   tcpi_pacing_rate;
+  uint64_t   tcpi_max_pacing_rate;
+
+  // Total bytes ACKed by remote peer.
+  uint64_t   tcpi_bytes_acked;    /* RFC4898 tcpEStatsAppHCThruOctetsAcked */
+  // Total bytes received (for which ACKs have been sent out).
+  uint64_t   tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived */
+  // Segments sent and received.
+  uint32_t   tcpi_segs_out;       /* RFC4898 tcpEStatsPerfSegsOut */
+  uint32_t   tcpi_segs_in;        /* RFC4898 tcpEStatsPerfSegsIn */
+
+  // The following metrics are quite new and not in el7.
+  // ------------------------------------------------------------
+  uint32_t   tcpi_notsent_bytes;
+  uint32_t   tcpi_min_rtt;
+  uint32_t   tcpi_data_segs_in;      /* RFC4898 tcpEStatsDataSegsIn */
+  uint32_t   tcpi_data_segs_out;     /* RFC4898 tcpEStatsDataSegsOut */
+
+  // Calculated rate at which data was delivered.
+  uint64_t   tcpi_delivery_rate;
+
+  // Timers for various states.
+  uint64_t   tcpi_busy_time;      /* Time (usec) busy sending data */
+  uint64_t   tcpi_rwnd_limited;   /* Time (usec) limited by receive window */
+  uint64_t   tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */
+};
+
+} // anonymous namespace
+
 ///
 /// Connection
 ///
@@ -728,7 +863,7 @@ void Connection::MarkNegotiationComplete() {
   negotiation_complete_ = true;
 }
 
-Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
+Status Connection::DumpPB(const DumpConnectionsRequestPB& req,
                           RpcConnectionPB* resp) {
   DCHECK(reactor_thread_->IsCurrentThread());
   resp->set_remote_ip(remote_.ToString());
@@ -760,8 +895,95 @@ Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
   } else {
     LOG(FATAL);
   }
+#ifdef __linux__
+  if (negotiation_complete_) {
+    // TODO(todd): it's a little strange to not set socket level stats during
+    // negotiation, but we don't have access to the socket here until negotiation
+    // is complete.
+    WARN_NOT_OK(GetSocketStatsPB(resp->mutable_socket_stats()),
+                "could not fill in TCP info for RPC connection");
+  }
+#endif // __linux__
+  return Status::OK();
+}
+
+#ifdef __linux__
+Status Connection::GetSocketStatsPB(SocketStatsPB* pb) const {
+  DCHECK(reactor_thread_->IsCurrentThread());
+  int fd = socket_->GetFd();
+  CHECK_GE(fd, 0);
+
+  // Fetch TCP_INFO statistics from the kernel.
+  tcp_info ti;
+  memset(&ti, 0, sizeof(ti));
+  socklen_t len = sizeof(ti);
+  int rc = getsockopt(fd, IPPROTO_TCP, TCP_INFO, &ti, &len);
+  if (rc == 0) {
+#   define HAS_FIELD(field_name) \
+        (len >= offsetof(tcp_info, field_name) + sizeof(ti.field_name))
+    if (!HAS_FIELD(tcpi_total_retrans)) {
+      // All the fields up through tcpi_total_retrans were present since very old
+      // kernel versions, beyond our minimal supported. So, we can just bail if we
+      // don't get sufficient data back.
+      return Status::NotSupported("bad length returned for TCP_INFO");
+    }
+
+    pb->set_rtt(ti.tcpi_rtt);
+    pb->set_rttvar(ti.tcpi_rttvar);
+    pb->set_snd_cwnd(ti.tcpi_snd_cwnd);
+    pb->set_total_retrans(ti.tcpi_total_retrans);
+
+    // The following fields were added later in kernel development history.
+    // In RHEL6 they were backported starting in 6.8. Even though they were
+    // backported all together as a group, we'll just be safe and check for
+    // each individually.
+    if (HAS_FIELD(tcpi_pacing_rate)) {
+      pb->set_pacing_rate(ti.tcpi_pacing_rate);
+    }
+    if (HAS_FIELD(tcpi_max_pacing_rate)) {
+      pb->set_max_pacing_rate(ti.tcpi_max_pacing_rate);
+    }
+    if (HAS_FIELD(tcpi_bytes_acked)) {
+      pb->set_bytes_acked(ti.tcpi_bytes_acked);
+    }
+    if (HAS_FIELD(tcpi_bytes_received)) {
+      pb->set_bytes_received(ti.tcpi_bytes_received);
+    }
+    if (HAS_FIELD(tcpi_segs_out)) {
+      pb->set_segs_out(ti.tcpi_segs_out);
+    }
+    if (HAS_FIELD(tcpi_segs_in)) {
+      pb->set_segs_in(ti.tcpi_segs_in);
+    }
+
+    // Calculate sender bandwidth based on the same logic used by the 'ss' utility.
+    if (ti.tcpi_rtt > 0 && ti.tcpi_snd_mss && ti.tcpi_snd_cwnd) {
+      // Units:
+      //  rtt = usec
+      //  cwnd = number of MSS-size packets
+      //  mss = bytes / packet
+      //
+      // Dimensional analysis:
+      //   packets * bytes/packet * usecs/sec / usec -> bytes/sec
+      static constexpr int kUsecsPerSec = 1000000;
+      pb->set_send_bytes_per_sec(static_cast<int64_t>(ti.tcpi_snd_cwnd) *
+                                 ti.tcpi_snd_mss * kUsecsPerSec / ti.tcpi_rtt);
+    }
+  }
+
+  // Fetch the queue sizes.
+  int queue_len = 0;
+  rc = ioctl(fd, TIOCOUTQ, &queue_len);
+  if (rc == 0) {
+    pb->set_send_queue_bytes(queue_len);
+  }
+  rc = ioctl(fd, FIONREAD, &queue_len);
+  if (rc == 0) {
+    pb->set_receive_queue_bytes(queue_len);
+  }
   return Status::OK();
 }
+#endif // __linux__
 
 } // namespace rpc
 } // namespace kudu
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index 362a35b..9fb6a6c 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -50,12 +50,13 @@ namespace kudu {
 
 namespace rpc {
 
-class DumpRunningRpcsRequestPB;
+class DumpConnectionsRequestPB;
 class InboundCall;
 class OutboundCall;
 class RpcConnectionPB;
 class ReactorThread;
 class RpczStore;
+class SocketStatsPB;
 enum class CredentialsPolicy;
 
 //
@@ -193,7 +194,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // Indicate that negotiation is complete and that the Reactor is now in control of the
socket.
   void MarkNegotiationComplete();
 
-  Status DumpPB(const DumpRunningRpcsRequestPB& req,
+  Status DumpPB(const DumpConnectionsRequestPB& req,
                 RpcConnectionPB* resp);
 
   ReactorThread* reactor_thread() const { return reactor_thread_; }
@@ -298,6 +299,8 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // reaches state specified in 'FLAGS_rpc_inject_cancellation_state'.
   void MaybeInjectCancellation(const std::shared_ptr<OutboundCall> &call);
 
+  Status GetSocketStatsPB(SocketStatsPB* pb) const;
+
   // The reactor thread that created this connection.
   ReactorThread* const reactor_thread_;
 
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 6920071..655c453 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -250,7 +250,7 @@ string InboundCall::ToString() const {
                       header_.call_id());
 }
 
-void InboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
+void InboundCall::DumpPB(const DumpConnectionsRequestPB& req,
                          RpcCallInProgressPB* resp) {
   resp->mutable_header()->CopyFrom(header_);
   if (req.include_traces() && trace_) {
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 0db4c37..07c57dc 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -54,7 +54,7 @@ class Trace;
 namespace rpc {
 
 class Connection;
-class DumpRunningRpcsRequestPB;
+class DumpConnectionsRequestPB;
 class RemoteUser;
 class RpcCallInProgressPB;
 class RpcSidecar;
@@ -135,7 +135,7 @@ class InboundCall {
 
   std::string ToString() const;
 
-  void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp);
+  void DumpPB(const DumpConnectionsRequestPB& req, RpcCallInProgressPB* resp);
 
   const RemoteUser& remote_user() const;
 
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 17ac0c5..4129172 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -449,11 +449,11 @@ Status Messenger::Init() {
   return Status::OK();
 }
 
-Status Messenger::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                                  DumpRunningRpcsResponsePB* resp) {
+Status Messenger::DumpConnections(const DumpConnectionsRequestPB& req,
+                                  DumpConnectionsResponsePB* resp) {
   shared_lock<rw_spinlock> guard(lock_.get_lock());
   for (Reactor* reactor : reactors_) {
-    RETURN_NOT_OK(reactor->DumpRunningRpcs(req, resp));
+    RETURN_NOT_OK(reactor->DumpConnections(req, resp));
   }
   return Status::OK();
 }
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index b3a78e0..56b087c 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -60,8 +60,8 @@ using security::RpcAuthentication;
 using security::RpcEncryption;
 
 class AcceptorPool;
-class DumpRunningRpcsRequestPB;
-class DumpRunningRpcsResponsePB;
+class DumpConnectionsRequestPB;
+class DumpConnectionsResponsePB;
 class InboundCall;
 class Messenger;
 class OutboundCall;
@@ -265,9 +265,9 @@ class Messenger {
   // Take ownership of the socket via Socket::Release
   void RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote);
 
-  // Dump the current RPCs into the given protobuf.
-  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                         DumpRunningRpcsResponsePB* resp);
+  // Dump info on related TCP connections into the given protobuf.
+  Status DumpConnections(const DumpConnectionsRequestPB& req,
+                         DumpConnectionsResponsePB* resp);
 
   // Run 'func' on a reactor thread after 'when' time elapses.
   //
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index 37d02ac..17761f5 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -449,7 +449,7 @@ string OutboundCall::ToString() const {
   return Substitute("RPC call $0 -> $1", remote_method_.ToString(), conn_id_.ToString());
 }
 
-void OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
+void OutboundCall::DumpPB(const DumpConnectionsRequestPB& req,
                           RpcCallInProgressPB* resp) {
   std::lock_guard<simple_spinlock> l(lock_);
   resp->mutable_header()->CopyFrom(header_);
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index 8d43891..c48e496 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -55,7 +55,7 @@ namespace kudu {
 namespace rpc {
 
 class CallResponse;
-class DumpRunningRpcsRequestPB;
+class DumpConnectionsRequestPB;
 class RpcCallInProgressPB;
 class RpcController;
 class RpcSidecar;
@@ -154,7 +154,7 @@ class OutboundCall {
 
   std::string ToString() const;
 
-  void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp);
+  void DumpPB(const DumpConnectionsRequestPB& req, RpcCallInProgressPB* resp);
 
   ////////////////////////////////////////////////////////////
   // Getters
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index c1832ef..b9308b5 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -273,8 +273,8 @@ Status ReactorThread::GetMetrics(ReactorMetrics* metrics) {
   return Status::OK();
 }
 
-Status ReactorThread::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                                      DumpRunningRpcsResponsePB* resp) {
+Status ReactorThread::DumpConnections(const DumpConnectionsRequestPB& req,
+                                      DumpConnectionsResponsePB* resp) {
   DCHECK(IsCurrentThread());
   for (const scoped_refptr<Connection>& conn : server_conns_) {
     RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections()));
@@ -800,10 +800,10 @@ Status Reactor::RunOnReactorThread(const boost::function<Status()>&
f) {
   return task.Wait();
 }
 
-Status Reactor::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                                DumpRunningRpcsResponsePB* resp) {
-  return RunOnReactorThread(boost::bind(&ReactorThread::DumpRunningRpcs, &thread_,
-                                        boost::ref(req), resp));
+Status Reactor::DumpConnections(const DumpConnectionsRequestPB& req,
+                                DumpConnectionsResponsePB* resp) {
+  return RunOnReactorThread(boost::bind(&ReactorThread::DumpConnections,
+                                        &thread_, boost::ref(req), resp));
 }
 
 class RegisterConnectionTask : public ReactorTask {
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index ce251c1..a74a3bb 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -49,8 +49,8 @@ namespace rpc {
 
 typedef std::list<scoped_refptr<Connection>> conn_list_t;
 
-class DumpRunningRpcsRequestPB;
-class DumpRunningRpcsResponsePB;
+class DumpConnectionsRequestPB;
+class DumpConnectionsResponsePB;
 class OutboundCall;
 class Reactor;
 class ReactorThread;
@@ -149,8 +149,8 @@ class ReactorThread {
   Status Init();
 
   // Add any connections on this reactor thread into the given status dump.
-  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                         DumpRunningRpcsResponsePB* resp);
+  Status DumpConnections(const DumpConnectionsRequestPB& req,
+                         DumpConnectionsResponsePB* resp);
 
   // Shuts down a reactor thread, optionally waiting for it to exit.
   // Reactor::Shutdown() must have been called already.
@@ -356,8 +356,8 @@ class Reactor {
   Status GetMetrics(ReactorMetrics *metrics);
 
   // Add any connections on this reactor thread into the given status dump.
-  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                         DumpRunningRpcsResponsePB* resp);
+  Status DumpConnections(const DumpConnectionsRequestPB& req,
+                         DumpConnectionsResponsePB* resp);
 
   // Queue a new incoming connection. Takes ownership of the underlying fd from
   // 'socket', but not the Socket object itself.
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 2af84a1..0d70bbf 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -534,12 +534,25 @@ TEST_P(TestRpc, TestClientConnectionMetrics) {
     });
 
     // Test the OutboundTransfer queue.
-    DumpRunningRpcsRequestPB dump_req;
-    DumpRunningRpcsResponsePB dump_resp;
+    DumpConnectionsRequestPB dump_req;
+    DumpConnectionsResponsePB dump_resp;
     dump_req.set_include_traces(false);
-    ASSERT_OK(client_messenger->DumpRunningRpcs(dump_req, &dump_resp));
+    ASSERT_OK(client_messenger->DumpConnections(dump_req, &dump_resp));
     ASSERT_EQ(1, dump_resp.outbound_connections_size());
-    ASSERT_GT(dump_resp.outbound_connections(0).outbound_queue_size(), 0);
+    const auto& conn = dump_resp.outbound_connections(0);
+    ASSERT_GT(conn.outbound_queue_size(), 0);
+
+#ifdef __linux__
+    // Test that the socket statistics are present. We only assert on those that
+    // we know to be present on all kernel versions.
+    ASSERT_TRUE(conn.has_socket_stats());
+    ASSERT_GT(conn.socket_stats().rtt(), 0);
+    ASSERT_GT(conn.socket_stats().rttvar(), 0);
+    ASSERT_GT(conn.socket_stats().snd_cwnd(), 0);
+    ASSERT_GT(conn.socket_stats().send_bytes_per_sec(), 0);
+    ASSERT_TRUE(conn.socket_stats().has_send_queue_bytes());
+    ASSERT_TRUE(conn.socket_stats().has_receive_queue_bytes());
+#endif
 
     // Unblock all of the calls and wait for them to finish.
     latch.Wait();
diff --git a/src/kudu/rpc/rpc_introspection.proto b/src/kudu/rpc/rpc_introspection.proto
index 7685903..05be722 100644
--- a/src/kudu/rpc/rpc_introspection.proto
+++ b/src/kudu/rpc/rpc_introspection.proto
@@ -50,6 +50,33 @@ message RpcCallInProgressPB {
   optional State state = 4;
 }
 
+// The SocketStatsPB message is used to report on socket-level information
+// for RPC-related TCP connections (Linux-only). Essentially, the message
+// contains some metrics and counters from Linux-specific 'tcp_info' structure
+// defined in /usr/include/linux/tcp.h. For more information on the TCP on
+// Linux, see http://man7.org/linux/man-pages/man7/tcp.7.html
+message SocketStatsPB {
+  optional uint32 rtt = 1;
+  optional uint32 rttvar = 2;
+  optional uint32 snd_cwnd = 3;
+  optional uint32 total_retrans = 4;
+
+  optional uint32 pacing_rate = 5;
+  optional uint32 max_pacing_rate = 6;
+
+  optional uint64 bytes_acked = 7;
+  optional uint64 bytes_received = 8;
+  optional uint32 segs_out = 9;
+  optional uint32 segs_in = 10;
+
+  optional uint64 send_queue_bytes = 11;
+  optional uint64 receive_queue_bytes = 12;
+
+  // Calculated sender throughput.
+  optional int32 send_bytes_per_sec = 13;
+};
+
+// Debugging information about a currently-open RPC connection.
 message RpcConnectionPB {
   enum StateType {
     UNKNOWN = 999;
@@ -63,13 +90,16 @@ message RpcConnectionPB {
   optional string remote_user_credentials = 3;
   repeated RpcCallInProgressPB calls_in_flight = 4;
   optional int64 outbound_queue_size = 5;
+
+  // Information on the actual TCP connection as reported by the kernel.
+  optional SocketStatsPB socket_stats = 6;
 }
 
-message DumpRunningRpcsRequestPB {
+message DumpConnectionsRequestPB {
   optional bool include_traces = 1 [ default = false ];
 }
 
-message DumpRunningRpcsResponsePB {
+message DumpConnectionsResponsePB {
   repeated RpcConnectionPB inbound_connections = 1;
   repeated RpcConnectionPB outbound_connections = 2;
 }
diff --git a/src/kudu/rpc/rpc_stub-test.cc b/src/kudu/rpc/rpc_stub-test.cc
index dc92f2e..f7ba53a 100644
--- a/src/kudu/rpc/rpc_stub-test.cc
+++ b/src/kudu/rpc/rpc_stub-test.cc
@@ -593,11 +593,11 @@ TEST_F(RpcStubTest, TestDumpCallsInFlight) {
                boost::bind(&CountDownLatch::CountDown, &sleep.latch));
 
   // Check the running RPC status on the client messenger.
-  DumpRunningRpcsRequestPB dump_req;
-  DumpRunningRpcsResponsePB dump_resp;
+  DumpConnectionsRequestPB dump_req;
+  DumpConnectionsResponsePB dump_resp;
   dump_req.set_include_traces(true);
 
-  ASSERT_OK(client_messenger_->DumpRunningRpcs(dump_req, &dump_resp));
+  ASSERT_OK(client_messenger_->DumpConnections(dump_req, &dump_resp));
   LOG(INFO) << "client messenger: " << SecureDebugString(dump_resp);
   ASSERT_EQ(1, dump_resp.outbound_connections_size());
   ASSERT_EQ(1, dump_resp.outbound_connections(0).calls_in_flight_size());
@@ -610,7 +610,7 @@ TEST_F(RpcStubTest, TestDumpCallsInFlight) {
   // asynchronously off of the main thread (ie the server may not be handling it yet)
   for (int i = 0; i < 100; i++) {
     dump_resp.Clear();
-    ASSERT_OK(server_messenger_->DumpRunningRpcs(dump_req, &dump_resp));
+    ASSERT_OK(server_messenger_->DumpConnections(dump_req, &dump_resp));
     if (dump_resp.inbound_connections_size() > 0 &&
         dump_resp.inbound_connections(0).calls_in_flight_size() > 0) {
       break;
diff --git a/src/kudu/server/rpcz-path-handler.cc b/src/kudu/server/rpcz-path-handler.cc
index 80a2840..97f8faa 100644
--- a/src/kudu/server/rpcz-path-handler.cc
+++ b/src/kudu/server/rpcz-path-handler.cc
@@ -33,8 +33,8 @@
 #include "kudu/util/jsonwriter.h"
 #include "kudu/util/web_callback_registry.h"
 
-using kudu::rpc::DumpRunningRpcsRequestPB;
-using kudu::rpc::DumpRunningRpcsResponsePB;
+using kudu::rpc::DumpConnectionsRequestPB;
+using kudu::rpc::DumpConnectionsResponsePB;
 using kudu::rpc::DumpRpczStoreRequestPB;
 using kudu::rpc::DumpRpczStoreResponsePB;
 using kudu::rpc::Messenger;
@@ -49,14 +49,14 @@ namespace {
 void RpczPathHandler(const shared_ptr<Messenger>& messenger,
                      const Webserver::WebRequest& req,
                      Webserver::PrerenderedWebResponse* resp) {
-  DumpRunningRpcsResponsePB running_rpcs;
+  DumpConnectionsResponsePB running_rpcs;
   {
-    DumpRunningRpcsRequestPB dump_req;
+    DumpConnectionsRequestPB dump_req;
 
     string arg = FindWithDefault(req.parsed_args, "include_traces", "false");
     dump_req.set_include_traces(ParseLeadingBoolValue(arg.c_str(), false));
 
-    messenger->DumpRunningRpcs(dump_req, &running_rpcs);
+    messenger->DumpConnections(dump_req, &running_rpcs);
   }
   DumpRpczStoreResponsePB sampled_rpcs;
   {


Mime
View raw message