impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [7/9] incubator-impala git commit: KUDU-2032 (part 1): pass pre-resolution hostname into RPC proxies
Date Tue, 05 Sep 2017 22:26:52 GMT
KUDU-2032 (part 1): pass pre-resolution hostname into RPC proxies

This modifies the constructor of RPC proxies (generated and otherwise)
to take the remote hostname in addition to the existing resolved
Sockaddr parameter. The hostname is then passed into the ConnectionId
object, and plumbed through to the SASL client in place of the IP
address that was used previously.

The patch changes all of the construction sites of Proxy to fit the new
interface. In most of the test cases, we don't have real hostnames, so
we just use the dotted-decimal string form of the remote Sockaddr, which
matches the existing behavior.

In the real call sites, we have actual host names typically specified by
the user, and in those cases we'll need to pass those into the proxy. In
a few cases, they were conveniently available in the same function that
creates the proxy. In others, they are relatively far away, so this
patch just uses the dotted-decimal string and leaves TODOs.

In the case that Kerberos is not configured, this change should have no
effect since the hostname is ignored by SASL "plain". In the case that
Kerberos is configured with 'rdns=true', they also have no effect,
because the krb5 library will resolve and reverse the hostname from the
IP as it did before. In the case that 'rdns=false', this moves us one
step closer to fixing KUDU-2032 by getting a hostname into the SASL
library.

I verified that, if I set 'rdns = false' on a Kerberized client, I'm now
able to run  'kudu master status <host>' successfully where it would not
before. This tool uses a direct proxy instantiation where the hostname
was easy to plumb in. 'kudu table list <host>' still does not work because
it uses the client, which wasn't convenient to plumb quite yet.

Given that this makes incremental improvement towards fixing the issue
without any regression, and is already a fairly wide patch, I hope to
commit this and then address the remaining plumbing in a separate patch.

Change-Id: I96fb3c73382f0be6e30e29ae2e7176be42f3bb98
Reviewed-on: http://gerrit.cloudera.org:8080/7687
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/7897
Reviewed-by: Sailesh Mukil <sailesh@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e4a2d226
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e4a2d226
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e4a2d226

Branch: refs/heads/master
Commit: e4a2d226a7649b29c346b3f4f072984a938af787
Parents: 2774d80
Author: Todd Lipcon <todd@apache.org>
Authored: Tue Aug 15 18:20:48 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Sat Sep 2 08:21:21 2017 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/connection.h             | 15 +++---
 be/src/kudu/rpc/connection_id.cc         | 25 ++++++---
 be/src/kudu/rpc/connection_id.h          | 10 +++-
 be/src/kudu/rpc/exactly_once_rpc-test.cc |  3 +-
 be/src/kudu/rpc/mt-rpc-test.cc           |  6 ++-
 be/src/kudu/rpc/negotiation.cc           |  9 ++--
 be/src/kudu/rpc/protoc-gen-krpc.cc       | 11 ++--
 be/src/kudu/rpc/proxy.cc                 |  6 ++-
 be/src/kudu/rpc/proxy.h                  |  5 +-
 be/src/kudu/rpc/reactor.cc               |  5 +-
 be/src/kudu/rpc/rpc-bench.cc             |  4 +-
 be/src/kudu/rpc/rpc-test.cc              | 73 ++++++++++++++++++---------
 be/src/kudu/rpc/rpc_stub-test.cc         | 47 +++++++++--------
 be/src/kudu/util/net/net_util-test.cc    |  1 +
 be/src/kudu/util/net/sockaddr.cc         |  4 +-
 be/src/kudu/util/net/sockaddr.h          |  3 ++
 16 files changed, 142 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection.h b/be/src/kudu/rpc/connection.h
index 7f16b7b..9a78c14 100644
--- a/be/src/kudu/rpc/connection.h
+++ b/be/src/kudu/rpc/connection.h
@@ -131,15 +131,17 @@ class Connection : public RefCountedThreadSafe<Connection> {
   const Sockaddr &remote() const { return remote_; }
 
   // Set the user credentials for an outbound connection.
-  void set_local_user_credentials(UserCredentials creds) {
+  void set_outbound_connection_id(ConnectionId conn_id) {
     DCHECK_EQ(direction_, CLIENT);
-    local_user_credentials_ = std::move(creds);
+    DCHECK(!outbound_connection_id_);
+    outbound_connection_id_ = std::move(conn_id);
   }
 
   // Get the user credentials which will be used to log in.
-  const UserCredentials& local_user_credentials() const {
+  const ConnectionId& outbound_connection_id() const {
     DCHECK_EQ(direction_, CLIENT);
-    return local_user_credentials_;
+    DCHECK(outbound_connection_id_);
+    return *outbound_connection_id_;
   }
 
   // Credentials policy to start connection negotiation.
@@ -288,8 +290,9 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // The socket we're communicating on.
   std::unique_ptr<Socket> socket_;
 
-  // The credentials of the user operating on this connection (if a client user).
-  UserCredentials local_user_credentials_;
+  // The ConnectionId that serves as a key into the client connection map
+  // within this reactor. Only set in the case of outbound connections.
+  boost::optional<ConnectionId> outbound_connection_id_;
 
   // The authenticated remote user (if this is an inbound connection on the server).
   RemoteUser remote_user_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/connection_id.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection_id.cc b/be/src/kudu/rpc/connection_id.cc
index a17b783..5e24086 100644
--- a/be/src/kudu/rpc/connection_id.cc
+++ b/be/src/kudu/rpc/connection_id.cc
@@ -28,9 +28,13 @@ namespace rpc {
 
 ConnectionId::ConnectionId() {}
 
-ConnectionId::ConnectionId(const Sockaddr& remote, UserCredentials user_credentials)
{
-  remote_ = remote;
-  user_credentials_ = std::move(user_credentials);
+ConnectionId::ConnectionId(const Sockaddr& remote,
+                           std::string hostname,
+                           UserCredentials user_credentials)
+    : remote_(remote),
+      hostname_(std::move(hostname)),
+      user_credentials_(std::move(user_credentials)) {
+  CHECK(!hostname_.empty());
 }
 
 void ConnectionId::set_user_credentials(UserCredentials user_credentials) {
@@ -38,21 +42,30 @@ void ConnectionId::set_user_credentials(UserCredentials user_credentials)
{
 }
 
 string ConnectionId::ToString() const {
+  string remote;
+  if (hostname_ != remote_.host()) {
+    remote = strings::Substitute("$0 ($1)", remote_.ToString(), hostname_);
+  } else {
+    remote = remote_.ToString();
+  }
+
   return strings::Substitute("{remote=$0, user_credentials=$1}",
-                             remote_.ToString(),
+                             remote,
                              user_credentials_.ToString());
 }
 
 size_t ConnectionId::HashCode() const {
   size_t seed = 0;
   boost::hash_combine(seed, remote_.HashCode());
+  boost::hash_combine(seed, hostname_);
   boost::hash_combine(seed, user_credentials_.HashCode());
   return seed;
 }
 
 bool ConnectionId::Equals(const ConnectionId& other) const {
-  return (remote() == other.remote()
-       && user_credentials().Equals(other.user_credentials()));
+  return remote() == other.remote() &&
+      hostname_ == other.hostname_ &&
+      user_credentials().Equals(other.user_credentials());
 }
 
 size_t ConnectionIdHash::operator() (const ConnectionId& conn_id) const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/connection_id.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection_id.h b/be/src/kudu/rpc/connection_id.h
index 09f1738..195b47c 100644
--- a/be/src/kudu/rpc/connection_id.h
+++ b/be/src/kudu/rpc/connection_id.h
@@ -35,11 +35,15 @@ class ConnectionId {
   ConnectionId(const ConnectionId& other) = default;
 
   // Convenience constructor.
-  ConnectionId(const Sockaddr& remote, UserCredentials user_credentials);
+  ConnectionId(const Sockaddr& remote,
+               std::string hostname,
+               UserCredentials user_credentials);
 
   // The remote address.
   const Sockaddr& remote() const { return remote_; }
 
+  const std::string& hostname() const { return hostname_; }
+
   // The credentials of the user associated with this connection, if any.
   void set_user_credentials(UserCredentials user_credentials);
 
@@ -58,6 +62,10 @@ class ConnectionId {
   // Remember to update HashCode() and Equals() when new fields are added.
   Sockaddr remote_;
 
+  // The original host name before it was resolved to 'remote_'.
+  // This must be retained since it is used to compute Kerberos Service Principal Names (SPNs).
+  std::string hostname_;
+
   UserCredentials user_credentials_;
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/exactly_once_rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/exactly_once_rpc-test.cc b/be/src/kudu/rpc/exactly_once_rpc-test.cc
index 388919d..e16681d 100644
--- a/be/src/kudu/rpc/exactly_once_rpc-test.cc
+++ b/be/src/kudu/rpc/exactly_once_rpc-test.cc
@@ -166,7 +166,8 @@ class ExactlyOnceRpcTest : public RpcTestBase {
     // Set up server.
     StartTestServerWithGeneratedCode(&server_addr_);
     client_messenger_ = CreateMessenger("Client");
-    proxy_.reset(new CalculatorServiceProxy(client_messenger_, server_addr_));
+    proxy_.reset(new CalculatorServiceProxy(
+        client_messenger_, server_addr_, server_addr_.host()));
     test_picker_.reset(new TestServerPicker(proxy_.get()));
     request_tracker_.reset(new RequestTracker(kClientId));
     attempt_nos_ = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/mt-rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/mt-rpc-test.cc b/be/src/kudu/rpc/mt-rpc-test.cc
index 73e3a13..10056ae 100644
--- a/be/src/kudu/rpc/mt-rpc-test.cc
+++ b/be/src/kudu/rpc/mt-rpc-test.cc
@@ -45,7 +45,8 @@ class MultiThreadedRpcTest : public RpcTestBase {
                   Status* result, CountDownLatch* latch) {
     LOG(INFO) << "Connecting to " << server_addr.ToString();
     shared_ptr<Messenger> client_messenger(CreateMessenger("ClientSC"));
-    Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+    Proxy p(client_messenger, server_addr, server_addr.host(),
+            GenericCalculatorService::static_service_name());
     *result = DoTestSyncCall(p, method_name);
     latch->CountDown();
   }
@@ -61,7 +62,8 @@ class MultiThreadedRpcTest : public RpcTestBase {
       Sockaddr server_addr, const char* method_name, Status* last_result,
       const shared_ptr<Messenger>& messenger) {
     LOG(INFO) << "Connecting to " << server_addr.ToString();
-    Proxy p(messenger, server_addr, GenericCalculatorService::static_service_name());
+    Proxy p(messenger, server_addr, server_addr.host(),
+            GenericCalculatorService::static_service_name());
 
     int i = 0;
     while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/negotiation.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/negotiation.cc b/be/src/kudu/rpc/negotiation.cc
index 66a0112..0c5ca3f 100644
--- a/be/src/kudu/rpc/negotiation.cc
+++ b/be/src/kudu/rpc/negotiation.cc
@@ -169,11 +169,7 @@ static Status DoClientNegotiation(Connection* conn,
                                        authn_token,
                                        encryption);
 
-  // Note that the fqdn is an IP address here: we've already lost whatever DNS
-  // name the client was attempting to use. Unless krb5 is configured with 'rdns
-  // = false', it will automatically take care of reversing this address to its
-  // canonical hostname to determine the expected server principal.
-  client_negotiation.set_server_fqdn(conn->remote().host());
+  client_negotiation.set_server_fqdn(conn->outbound_connection_id().hostname());
 
   if (authentication != RpcAuthentication::DISABLED) {
     Status s = client_negotiation.EnableGSSAPI();
@@ -202,7 +198,8 @@ static Status DoClientNegotiation(Connection* conn,
   }
 
   if (authentication != RpcAuthentication::REQUIRED) {
-    RETURN_NOT_OK(client_negotiation.EnablePlain(conn->local_user_credentials().real_user(),
""));
+    const auto& creds = conn->outbound_connection_id().user_credentials();
+    RETURN_NOT_OK(client_negotiation.EnablePlain(creds.real_user(), ""));
   }
 
   client_negotiation.set_deadline(deadline);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/protoc-gen-krpc.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/protoc-gen-krpc.cc b/be/src/kudu/rpc/protoc-gen-krpc.cc
index e892897..7873d08 100644
--- a/be/src/kudu/rpc/protoc-gen-krpc.cc
+++ b/be/src/kudu/rpc/protoc-gen-krpc.cc
@@ -576,8 +576,9 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator
{
       Print(printer, *subs,
         "class $service_name$Proxy : public ::kudu::rpc::Proxy {\n"
         " public:\n"
-        "  $service_name$Proxy(const std::shared_ptr< ::kudu::rpc::Messenger>\n"
-        "                &messenger, const ::kudu::Sockaddr &sockaddr);\n"
+        "  $service_name$Proxy(std::shared_ptr<::kudu::rpc::Messenger>\n"
+        "                messenger, const ::kudu::Sockaddr &sockaddr,"
+        "                std::string hostname);\n"
         "  ~$service_name$Proxy();\n"
         "\n"
         );
@@ -631,9 +632,9 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator
{
       subs->PushService(service);
       Print(printer, *subs,
         "$service_name$Proxy::$service_name$Proxy(\n"
-        "   const std::shared_ptr< ::kudu::rpc::Messenger> &messenger,\n"
-        "   const ::kudu::Sockaddr &remote)\n"
-        "  : Proxy(messenger, remote, \"$full_service_name$\") {\n"
+        "   std::shared_ptr< ::kudu::rpc::Messenger> messenger,\n"
+        "   const ::kudu::Sockaddr &remote, std::string hostname)\n"
+        "  : Proxy(std::move(messenger), remote, std::move(hostname), \"$full_service_name$\")
{\n"
         "}\n"
         "\n"
         "$service_name$Proxy::~$service_name$Proxy() {\n"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/proxy.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/proxy.cc b/be/src/kudu/rpc/proxy.cc
index 0d946ed..4374cb1 100644
--- a/be/src/kudu/rpc/proxy.cc
+++ b/be/src/kudu/rpc/proxy.cc
@@ -48,7 +48,9 @@ namespace kudu {
 namespace rpc {
 
 Proxy::Proxy(std::shared_ptr<Messenger> messenger,
-             const Sockaddr& remote, string service_name)
+             const Sockaddr& remote,
+             string hostname,
+             string service_name)
     : service_name_(std::move(service_name)),
       messenger_(std::move(messenger)),
       is_started_(false) {
@@ -66,7 +68,7 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger,
 
   UserCredentials creds;
   creds.set_real_user(std::move(real_user));
-  conn_id_ = ConnectionId(remote, std::move(creds));
+  conn_id_ = ConnectionId(remote, std::move(hostname), std::move(creds));
 }
 
 Proxy::~Proxy() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/proxy.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/proxy.h b/be/src/kudu/rpc/proxy.h
index 6e43e93..a3d852e 100644
--- a/be/src/kudu/rpc/proxy.h
+++ b/be/src/kudu/rpc/proxy.h
@@ -55,8 +55,11 @@ class Messenger;
 // After initialization, multiple threads may make calls using the same proxy object.
 class Proxy {
  public:
-  Proxy(std::shared_ptr<Messenger> messenger, const Sockaddr& remote,
+  Proxy(std::shared_ptr<Messenger> messenger,
+        const Sockaddr& remote,
+        std::string hostname,
         std::string service_name);
+
   ~Proxy();
 
   // Call a remote method asynchronously.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc
index f55cca0..b935e68 100644
--- a/be/src/kudu/rpc/reactor.cc
+++ b/be/src/kudu/rpc/reactor.cc
@@ -448,7 +448,7 @@ Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
   // Register the new connection in our map.
   *conn = new Connection(
       this, conn_id.remote(), std::move(new_socket), Connection::CLIENT, cred_policy);
-  (*conn)->set_local_user_credentials(conn_id.user_credentials());
+  (*conn)->set_outbound_connection_id(conn_id);
 
   // Kick off blocking client connection negotiation.
   Status s = StartConnectionNegotiation(*conn);
@@ -546,8 +546,7 @@ void ReactorThread::DestroyConnection(Connection *conn,
 
   // Unlink connection from lists.
   if (conn->direction() == Connection::CLIENT) {
-    ConnectionId conn_id(conn->remote(), conn->local_user_credentials());
-    const auto range = client_conns_.equal_range(conn_id);
+    const auto range = client_conns_.equal_range(conn->outbound_connection_id());
     CHECK(range.first != range.second) << "Couldn't find connection " << conn->ToString();
     // The client_conns_ container is a multi-map.
     for (auto it = range.first; it != range.second;) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/rpc-bench.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-bench.cc b/be/src/kudu/rpc/rpc-bench.cc
index d569ea1..47be939 100644
--- a/be/src/kudu/rpc/rpc-bench.cc
+++ b/be/src/kudu/rpc/rpc-bench.cc
@@ -129,7 +129,7 @@ class ClientThread {
   void Run() {
     shared_ptr<Messenger> client_messenger = bench_->CreateMessenger("Client");
 
-    CalculatorServiceProxy p(client_messenger, bench_->server_addr_);
+    CalculatorServiceProxy p(client_messenger, bench_->server_addr_, "localhost");
 
     AddRequestPB req;
     AddResponsePB resp;
@@ -182,7 +182,7 @@ class ClientAsyncWorkload {
       messenger_(std::move(messenger)),
       request_count_(0) {
     controller_.set_timeout(MonoDelta::FromSeconds(10));
-    proxy_.reset(new CalculatorServiceProxy(messenger_, bench_->server_addr_));
+    proxy_.reset(new CalculatorServiceProxy(messenger_, bench_->server_addr_, "localhost"));
   }
 
   void CallOneRpc() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
index 3989558..a7865ec 100644
--- a/be/src/kudu/rpc/rpc-test.cc
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -130,7 +130,8 @@ TEST_P(TestRpc, TestNegotiationDeadlock) {
   Sockaddr server_addr;
   StartTestServerWithCustomMessenger(&server_addr, messenger, enable_ssl);
 
-  Proxy p(messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
   ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
 }
 
@@ -144,7 +145,8 @@ TEST_P(TestRpc, TestCall) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
   ASSERT_STR_CONTAINS(p.ToString(), strings::Substitute("kudu.rpc.GenericCalculatorService@"
                                                             "{remote=$0, user_credentials=",
                                                         server_addr.ToString()));
@@ -170,7 +172,8 @@ TEST_P(TestRpc, TestCallWithChainCert) {
   // Set up client.
   SCOPED_TRACE(strings::Substitute("Connecting to $0", server_addr.ToString()));
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
   ASSERT_STR_CONTAINS(p.ToString(), strings::Substitute("kudu.rpc.GenericCalculatorService@"
                                                             "{remote=$0, user_credentials=",
                                                         server_addr.ToString()));
@@ -199,7 +202,8 @@ TEST_P(TestRpc, TestCallWithPasswordProtectedKey) {
   // Set up client.
   SCOPED_TRACE(strings::Substitute("Connecting to $0", server_addr.ToString()));
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
   ASSERT_STR_CONTAINS(p.ToString(), strings::Substitute("kudu.rpc.GenericCalculatorService@"
                                                             "{remote=$0, user_credentials=",
                                                         server_addr.ToString()));
@@ -234,7 +238,8 @@ TEST_P(TestRpc, TestCallToBadServer) {
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam()));
   Sockaddr addr;
   addr.set_port(0);
-  Proxy p(client_messenger, addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, addr, addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Loop a few calls to make sure that we properly set up and tear down
   // the connections.
@@ -255,7 +260,8 @@ TEST_P(TestRpc, TestInvalidMethodCall) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Call the method which fails.
   Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
@@ -273,7 +279,7 @@ TEST_P(TestRpc, TestWrongService) {
 
   // Set up client with the wrong service name.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, "WrongServiceName");
+  Proxy p(client_messenger, server_addr, "localhost", "WrongServiceName");
 
   // Call the method which fails.
   Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
@@ -308,7 +314,8 @@ TEST_P(TestRpc, TestHighFDs) {
   bool enable_ssl = GetParam();
   StartTestServer(&server_addr, enable_ssl);
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
   ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
 }
 
@@ -327,7 +334,8 @@ TEST_P(TestRpc, TestConnectionKeepalive) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
 
@@ -374,7 +382,8 @@ TEST_P(TestRpc, TestReopenOutboundConnections) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Verify the initial counters.
   ReactorMetrics metrics;
@@ -415,7 +424,8 @@ TEST_P(TestRpc, TestCredentialsPolicy) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Verify the initial counters.
   ReactorMetrics metrics;
@@ -483,7 +493,8 @@ TEST_P(TestRpc, TestCallLongerThanKeepalive) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Make a call which sleeps longer than the keepalive.
   RpcController controller;
@@ -504,7 +515,8 @@ TEST_P(TestRpc, TestRpcSidecar) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam()));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Test a zero-length sidecar
   DoTestSidecar(p, 0, 0);
@@ -543,7 +555,8 @@ TEST_P(TestRpc, TestRpcSidecarLimits) {
 
     // Set up client.
     shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam()));
-    Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+    Proxy p(client_messenger, server_addr, server_addr.host(),
+            GenericCalculatorService::static_service_name());
 
     RpcController controller;
     string s(FLAGS_rpc_max_message_size + 1, 'a');
@@ -574,7 +587,8 @@ TEST_P(TestRpc, TestCallTimeout) {
   bool enable_ssl = GetParam();
   StartTestServer(&server_addr, enable_ssl);
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Test a very short timeout - we expect this will time out while the
   // call is still trying to connect, or in the send queue. This was triggering ASAN failures
@@ -602,7 +616,8 @@ TEST_P(TestRpc, TestCallTimeoutDoesntAffectNegotiation) {
   bool enable_ssl = GetParam();
   StartTestServer(&server_addr, enable_ssl);
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   FLAGS_rpc_negotiation_inject_delay_ms = 500;
   ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(50)));
@@ -645,7 +660,8 @@ TEST_F(TestRpc, TestNegotiationTimeout) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   bool is_negotiation_error = false;
   ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(
@@ -666,7 +682,8 @@ TEST_F(TestRpc, TestServerShutsDown) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Send a call.
   AddRequestPB req;
@@ -750,7 +767,8 @@ TEST_P(TestRpc, TestRpcHandlerLatencyMetric) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          CalculatorService::static_service_name());
 
   RpcController controller;
   SleepRequestPB req;
@@ -798,7 +816,7 @@ TEST_P(TestRpc, TestRpcCallbackDestroysMessenger) {
   RpcController controller;
   controller.set_timeout(MonoDelta::FromMilliseconds(1));
   {
-    Proxy p(client_messenger, bad_addr, "xxx");
+    Proxy p(client_messenger, bad_addr, "xxx-host", "xxx-service");
     p.AsyncRequest("my-fake-method", req, &resp, &controller,
                    boost::bind(&DestroyMessengerCallback, &client_messenger, &latch));
   }
@@ -817,7 +835,8 @@ TEST_P(TestRpc, TestRpcContextClientDeadline) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          CalculatorService::static_service_name());
 
   SleepRequestPB req;
   req.set_sleep_micros(sleep_micros);
@@ -843,7 +862,8 @@ TEST_P(TestRpc, TestApplicationFeatureFlag) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          CalculatorService::static_service_name());
 
   { // Supported flag
     AddRequestPB req;
@@ -884,7 +904,8 @@ TEST_P(TestRpc, TestApplicationFeatureFlagUnsupportedServer) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          CalculatorService::static_service_name());
 
   { // Required flag
     AddRequestPB req;
@@ -919,7 +940,8 @@ TEST_P(TestRpc, TestCancellation) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   for (int i = OutboundCall::READY; i <= OutboundCall::FINISHED_SUCCESS; ++i) {
     FLAGS_rpc_inject_cancellation_state = i;
@@ -981,7 +1003,8 @@ TEST_P(TestRpc, TestCancellationAsync) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   RpcController controller;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/rpc_stub-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_stub-test.cc b/be/src/kudu/rpc/rpc_stub-test.cc
index 2fe0708..ccb4fd1 100644
--- a/be/src/kudu/rpc/rpc_stub-test.cc
+++ b/be/src/kudu/rpc/rpc_stub-test.cc
@@ -60,7 +60,7 @@ class RpcStubTest : public RpcTestBase {
   }
  protected:
   void SendSimpleCall() {
-    CalculatorServiceProxy p(client_messenger_, server_addr_);
+    CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
     RpcController controller;
     AddRequestPB req;
@@ -84,7 +84,7 @@ TEST_F(RpcStubTest, TestSimpleCall) {
 // reads and then makes a number of calls.
 TEST_F(RpcStubTest, TestShortRecvs) {
   FLAGS_socket_inject_short_recvs = true;
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   for (int i = 0; i < 100; i++) {
     NO_FATALS(SendSimpleCall());
@@ -102,7 +102,7 @@ TEST_F(RpcStubTest, TestBigCallData) {
   string data;
   data.resize(kMessageSize);
 
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   EchoRequestPB req;
   req.set_data(data);
@@ -127,7 +127,7 @@ TEST_F(RpcStubTest, TestBigCallData) {
 }
 
 TEST_F(RpcStubTest, TestRespondDeferred) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   RpcController controller;
   SleepRequestPB req;
@@ -139,7 +139,7 @@ TEST_F(RpcStubTest, TestRespondDeferred) {
 
 // Test that the default user credentials are propagated to the server.
 TEST_F(RpcStubTest, TestDefaultCredentialsPropagated) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   string expected;
   ASSERT_OK(GetLoggedInUser(&expected));
@@ -155,7 +155,7 @@ TEST_F(RpcStubTest, TestDefaultCredentialsPropagated) {
 // Test that the user can specify other credentials.
 TEST_F(RpcStubTest, TestCustomCredentialsPropagated) {
   const char* const kFakeUserName = "some fake user";
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   UserCredentials creds;
   creds.set_real_user(kFakeUserName);
@@ -172,7 +172,7 @@ TEST_F(RpcStubTest, TestCustomCredentialsPropagated) {
 TEST_F(RpcStubTest, TestAuthorization) {
   // First test calling WhoAmI() as user "alice", who is disallowed.
   {
-    CalculatorServiceProxy p(client_messenger_, server_addr_);
+    CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
     UserCredentials creds;
     creds.set_real_user("alice");
     p.set_user_credentials(creds);
@@ -189,7 +189,7 @@ TEST_F(RpcStubTest, TestAuthorization) {
 
   // Try some calls as "bob".
   {
-    CalculatorServiceProxy p(client_messenger_, server_addr_);
+    CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
     UserCredentials creds;
     creds.set_real_user("bob");
     p.set_user_credentials(creds);
@@ -217,7 +217,7 @@ TEST_F(RpcStubTest, TestAuthorization) {
 
 // Test that the user's remote address is accessible to the server.
 TEST_F(RpcStubTest, TestRemoteAddress) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   RpcController controller;
   WhoAmIRequestPB req;
@@ -233,7 +233,8 @@ TEST_F(RpcStubTest, TestRemoteAddress) {
 // Test sending a PB parameter with a missing field, where the client
 // thinks it has sent a full PB. (eg due to version mismatch)
 TEST_F(RpcStubTest, TestCallWithInvalidParam) {
-  Proxy p(client_messenger_, server_addr_, CalculatorService::static_service_name());
+  Proxy p(client_messenger_, server_addr_, server_addr_.host(),
+          CalculatorService::static_service_name());
 
   rpc_test::AddRequestPartialPB req;
   req.set_x(rand());
@@ -258,7 +259,7 @@ static void DoIncrement(Atomic32* count) {
 // This also ensures that the async callback is only called once
 // (regression test for a previously-encountered bug).
 TEST_F(RpcStubTest, TestCallWithMissingPBFieldClientSide) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   RpcController controller;
   AddRequestPB req;
@@ -278,7 +279,7 @@ TEST_F(RpcStubTest, TestCallWithMissingPBFieldClientSide) {
 }
 
 TEST_F(RpcStubTest, TestResponseWithMissingField) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   RpcController rpc;
   TestInvalidResponseRequestPB req;
@@ -293,7 +294,7 @@ TEST_F(RpcStubTest, TestResponseWithMissingField) {
 // configured RPC message size. The server should send the response, but the client
 // will reject it.
 TEST_F(RpcStubTest, TestResponseLargerThanFrameSize) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   RpcController rpc;
   TestInvalidResponseRequestPB req;
@@ -305,7 +306,8 @@ TEST_F(RpcStubTest, TestResponseLargerThanFrameSize) {
 
 // Test sending a call which isn't implemented by the server.
 TEST_F(RpcStubTest, TestCallMissingMethod) {
-  Proxy p(client_messenger_, server_addr_, CalculatorService::static_service_name());
+  Proxy p(client_messenger_, server_addr_, server_addr_.host(),
+          CalculatorService::static_service_name());
 
   Status s = DoTestSyncCall(p, "DoesNotExist");
   ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString();
@@ -313,7 +315,7 @@ TEST_F(RpcStubTest, TestCallMissingMethod) {
 }
 
 TEST_F(RpcStubTest, TestApplicationError) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   RpcController controller;
   SleepRequestPB req;
@@ -377,7 +379,7 @@ TEST_F(RpcStubTest, TestRpcPanic) {
     CHECK_OK(env_->DeleteRecursively(test_dir_));
 
     // Make an RPC which causes the server to abort.
-    CalculatorServiceProxy p(client_messenger_, server_addr_);
+    CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
     RpcController controller;
     PanicRequestPB req;
     PanicResponsePB resp;
@@ -395,7 +397,7 @@ struct AsyncSleep {
 };
 
 TEST_F(RpcStubTest, TestDontHandleTimedOutCalls) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
   vector<AsyncSleep*> sleeps;
   ElementDeleter d(&sleeps);
 
@@ -459,7 +461,8 @@ TEST_F(RpcStubTest, TestEarliestDeadlineFirstQueue) {
   for (int thread_id = 0; thread_id < num_client_threads; thread_id++) {
     threads.emplace_back([&, thread_id] {
         Random rng(thread_id);
-        CalculatorServiceProxy p(client_messenger_, server_addr_);
+        CalculatorServiceProxy p(
+            client_messenger_, server_addr_, server_addr_.host());
         while (!done.load()) {
           // Set a deadline in the future. We'll keep using this same deadline
           // on each of our retries.
@@ -517,7 +520,7 @@ TEST_F(RpcStubTest, TestEarliestDeadlineFirstQueue) {
 }
 
 TEST_F(RpcStubTest, TestDumpCallsInFlight) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
   AsyncSleep sleep;
   sleep.req.set_sleep_micros(100 * 1000); // 100ms
   p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc,
@@ -561,7 +564,7 @@ TEST_F(RpcStubTest, TestDumpCallsInFlight) {
 }
 
 TEST_F(RpcStubTest, TestDumpSampledCalls) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   // Issue two calls that fall into different latency buckets.
   AsyncSleep sleeps[2];
@@ -618,7 +621,7 @@ void MyTestCallback(CountDownLatch* latch, scoped_refptr<RefCountedTest>
my_refp
 // is held. This is important when the callback holds a refcounted ptr,
 // since we expect to be able to release that pointer when the call is done.
 TEST_F(RpcStubTest, TestCallbackClearedAfterRunning) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   CountDownLatch latch(1);
   scoped_refptr<RefCountedTest> my_refptr(new RefCountedTest);
@@ -647,7 +650,7 @@ TEST_F(RpcStubTest, DontTimeOutWhenReactorIsBlocked) {
       << "This test requires only a single reactor. Otherwise the injected sleep might
"
       << "be scheduled on a different reactor than the RPC call.";
 
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   // Schedule a 1-second sleep on the reactor thread.
   //

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/util/net/net_util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/net_util-test.cc b/be/src/kudu/util/net/net_util-test.cc
index c77b054..106a020 100644
--- a/be/src/kudu/util/net/net_util-test.cc
+++ b/be/src/kudu/util/net/net_util-test.cc
@@ -53,6 +53,7 @@ TEST(SockaddrTest, Test) {
   Sockaddr addr;
   ASSERT_OK(addr.ParseString("1.1.1.1:12345", 12345));
   ASSERT_EQ(12345, addr.port());
+  ASSERT_EQ("1.1.1.1", addr.host());
 }
 
 TEST_F(NetUtilTest, TestParseAddresses) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/util/net/sockaddr.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/sockaddr.cc b/be/src/kudu/util/net/sockaddr.cc
index ed249c7..a462c05 100644
--- a/be/src/kudu/util/net/sockaddr.cc
+++ b/be/src/kudu/util/net/sockaddr.cc
@@ -98,9 +98,7 @@ const struct sockaddr_in& Sockaddr::addr() const {
 }
 
 std::string Sockaddr::ToString() const {
-  char str[INET_ADDRSTRLEN];
-  ::inet_ntop(AF_INET, &addr_.sin_addr, str, INET_ADDRSTRLEN);
-  return StringPrintf("%s:%d", str, port());
+  return Substitute("$0:$1", host(), port());
 }
 
 bool Sockaddr::IsWildcard() const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/util/net/sockaddr.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/sockaddr.h b/be/src/kudu/util/net/sockaddr.h
index 09777f3..01506e7 100644
--- a/be/src/kudu/util/net/sockaddr.h
+++ b/be/src/kudu/util/net/sockaddr.h
@@ -54,11 +54,14 @@ class Sockaddr {
 
   uint32_t HashCode() const;
 
+  // Returns the dotted-decimal string '1.2.3.4' of the host component of this address.
   std::string host() const;
 
   void set_port(int port);
   int port() const;
   const struct sockaddr_in& addr() const;
+
+  // Returns the stringified address in '1.2.3.4:<port>' format.
   std::string ToString() const;
 
   // Returns true if the address is 0.0.0.0



Mime
View raw message