kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [2/2] kudu git commit: KUDU-2032 (part 2): propagate master hostnames into client
Date Thu, 05 Oct 2017 17:42:43 GMT
KUDU-2032 (part 2): propagate master hostnames into client

This changes the client code to remember the user-specified master
addresses and propagate them into the creation of master proxies.

It's not possible to reproduce the necessary DNS configurations in a
minicluster test, but with this patch I am now able to use 'kudu perf
loadgen' against a Kerberized cluster even when my local krb5.conf has
rdns=false.

Change-Id: I4bd299f72907a0d9442dd3eb45fcbaa2de4f73ef
Reviewed-on: http://gerrit.cloudera.org:8080/8186
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/acc8fe47
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/acc8fe47
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/acc8fe47

Branch: refs/heads/branch-1.4.x
Commit: acc8fe4734b572764be867083b024733e8959d33
Parents: 0fd6b4e
Author: Todd Lipcon <todd@apache.org>
Authored: Wed Aug 16 17:44:39 2017 -0700
Committer: Will Berkeley <wdberkeley@gmail.com>
Committed: Thu Oct 5 05:35:08 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/client-internal.cc              | 27 +++++++-----
 src/kudu/client/client-internal.h               |  3 +-
 src/kudu/client/master_rpc.cc                   | 43 ++++++++++++--------
 src/kudu/client/master_rpc.h                    | 12 +++---
 .../integration-tests/external_mini_cluster.cc  | 12 +++---
 5 files changed, 58 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/acc8fe47/src/kudu/client/client-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index f3d3526..63535d4 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -23,6 +23,7 @@
 #include <memory>
 #include <mutex>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include "kudu/client/master_rpc.h"
@@ -50,6 +51,7 @@
 #include "kudu/util/pb_util.h"
 #include "kudu/util/thread_restrictions.h"
 
+using std::pair;
 using std::set;
 using std::shared_ptr;
 using std::string;
@@ -625,10 +627,13 @@ Status KuduClient::Data::GetTableSchema(KuduClient* client,
 
 void KuduClient::Data::ConnectedToClusterCb(
     const Status& status,
-    const Sockaddr& leader_addr,
+    const pair<Sockaddr, string>& leader_addr_and_name,
     const master::ConnectToMasterResponsePB& connect_response,
     CredentialsPolicy cred_policy) {
 
+  const auto& leader_addr = leader_addr_and_name.first;
+  const auto& leader_hostname = leader_addr_and_name.second;
+
   // Ensure that all of the CAs reported by the master are trusted
   // in our local TLS configuration.
   if (status.ok()) {
@@ -669,9 +674,8 @@ void KuduClient::Data::ConnectedToClusterCb(
     }
 
     if (status.ok()) {
-      leader_master_hostport_ = HostPort(leader_addr);
-      // TODO(KUDU-2032): retain the original hostname passed by caller.
-      master_proxy_.reset(new MasterServiceProxy(messenger_, leader_addr, leader_addr.host()));
+      leader_master_hostport_ = HostPort(leader_hostname, leader_addr.port());
+      master_proxy_.reset(new MasterServiceProxy(messenger_, leader_addr, leader_hostname));
     }
   }
 
@@ -694,12 +698,15 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
                                              CredentialsPolicy creds_policy) {
   DCHECK(deadline.Initialized());
 
-  vector<Sockaddr> master_sockaddrs;
+  vector<pair<Sockaddr, string>> master_addrs_with_names;
   for (const string& master_server_addr : master_server_addrs_) {
     vector<Sockaddr> addrs;
-    Status s;
-    // TODO: Do address resolution asynchronously as well.
-    s = ParseAddressList(master_server_addr, master::Master::kDefaultPort, &addrs);
+    HostPort hp;
+    Status s = hp.ParseString(master_server_addr, master::Master::kDefaultPort);
+    if (s.ok()) {
+      // TODO(todd): Do address resolution asynchronously as well.
+      s = hp.ResolveAddresses(&addrs);
+    }
     if (!s.ok()) {
       cb.Run(s);
       return;
@@ -714,7 +721,7 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
           << "Specified master server address '" << master_server_addr <<
"' "
           << "resolved to multiple IPs. Using " << addrs[0].ToString();
     }
-    master_sockaddrs.push_back(addrs[0]);
+    master_addrs_with_names.emplace_back(addrs[0], hp.host());
   }
 
   // This ensures that no more than one ConnectToClusterRpc of each credentials
@@ -761,7 +768,7 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
             std::placeholders::_2,
             std::placeholders::_3,
             creds_policy),
-        std::move(master_sockaddrs),
+        std::move(master_addrs_with_names),
         deadline,
         client->default_rpc_timeout(),
         messenger_,

http://git-wip-us.apache.org/repos/asf/kudu/blob/acc8fe47/src/kudu/client/client-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index a7a1434..0d75ed0 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -22,6 +22,7 @@
 #include <set>
 #include <string>
 #include <unordered_set>
+#include <utility>
 #include <vector>
 
 #include <boost/function.hpp>
@@ -139,7 +140,7 @@ class KuduClient::Data {
   //
   // See also: ConnectToClusterAsync.
   void ConnectedToClusterCb(const Status& status,
-                            const Sockaddr& leader_addr,
+                            const std::pair<Sockaddr, std::string>& leader_addr_and_name,
                             const master::ConnectToMasterResponsePB& connect_response,
                             rpc::CredentialsPolicy cred_policy);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/acc8fe47/src/kudu/client/master_rpc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/master_rpc.cc b/src/kudu/client/master_rpc.cc
index 8d3049f..1c5f848 100644
--- a/src/kudu/client/master_rpc.cc
+++ b/src/kudu/client/master_rpc.cc
@@ -20,6 +20,8 @@
 #include "kudu/client/master_rpc.h"
 
 #include <mutex>
+#include <ostream>
+#include <utility>
 
 #include <boost/bind.hpp>
 
@@ -34,6 +36,7 @@
 #include "kudu/util/scoped_cleanup.h"
 
 
+using std::pair;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -48,6 +51,7 @@ using kudu::master::MasterServiceProxy;
 using kudu::rpc::CredentialsPolicy;
 using kudu::rpc::Messenger;
 using kudu::rpc::Rpc;
+using strings::Substitute;
 
 namespace kudu {
 namespace client {
@@ -68,7 +72,7 @@ class ConnectToMasterRpc : public rpc::Rpc {
   //
   // Invokes 'user_cb' upon failure or success of the RPC call.
   ConnectToMasterRpc(StatusCallback user_cb,
-                     const Sockaddr& addr,
+                     pair<Sockaddr, string> addr_with_name,
                      const MonoTime& deadline,
                      std::shared_ptr<rpc::Messenger> messenger,
                      CredentialsPolicy creds_policy,
@@ -84,7 +88,9 @@ class ConnectToMasterRpc : public rpc::Rpc {
   virtual void SendRpcCb(const Status& status) OVERRIDE;
 
   const StatusCallback user_cb_;
-  const Sockaddr addr_;
+
+  // The resolved address to try to connect to, along with its original specified hostname.
+  const pair<Sockaddr, string> addr_with_name_;
 
   // Owned by the caller of this RPC, not this instance.
   ConnectToMasterResponsePB* out_;
@@ -99,14 +105,14 @@ class ConnectToMasterRpc : public rpc::Rpc {
 
 
 ConnectToMasterRpc::ConnectToMasterRpc(StatusCallback user_cb,
-    const Sockaddr& addr,
+    pair<Sockaddr, string> addr_with_name,
     const MonoTime& deadline,
     shared_ptr<Messenger> messenger,
     rpc::CredentialsPolicy creds_policy,
     ConnectToMasterResponsePB* out)
       : Rpc(deadline, std::move(messenger)),
         user_cb_(std::move(user_cb)),
-        addr_(addr),
+        addr_with_name_(std::move(addr_with_name)),
         out_(DCHECK_NOTNULL(out)) {
   mutable_retrier()->mutable_controller()->set_credentials_policy(creds_policy);
 }
@@ -115,8 +121,7 @@ ConnectToMasterRpc::~ConnectToMasterRpc() {
 }
 
 void ConnectToMasterRpc::SendRpc() {
-  // TODO(KUDU-2032): retain the hostname for addr_
-  MasterServiceProxy proxy(retrier().messenger(), addr_, addr_.host());
+  MasterServiceProxy proxy(retrier().messenger(), addr_with_name_.first, addr_with_name_.second);
   rpc::RpcController* controller = mutable_retrier()->mutable_controller();
   // TODO(todd): should this be setting an RPC call deadline based on 'deadline'?
   // it doesn't seem to be.
@@ -137,8 +142,9 @@ void ConnectToMasterRpc::SendRpc() {
 }
 
 string ConnectToMasterRpc::ToString() const {
-  return strings::Substitute("ConnectToMasterRpc(address: $0, num_attempts: $1)",
-                             addr_.ToString(), num_attempts());
+  return strings::Substitute("ConnectToMasterRpc(address: $0:$1, num_attempts: $2)",
+                             addr_with_name_.second, addr_with_name_.first.port(),
+                             num_attempts());
 }
 
 void ConnectToMasterRpc::SendRpcCb(const Status& status) {
@@ -198,21 +204,21 @@ void ConnectToMasterRpc::SendRpcCb(const Status& status) {
 ////////////////////////////////////////////////////////////
 
 ConnectToClusterRpc::ConnectToClusterRpc(LeaderCallback user_cb,
-                                         vector<Sockaddr> addrs,
+                                         vector<pair<Sockaddr, string>> addrs_with_names,
                                          MonoTime deadline,
                                          MonoDelta rpc_timeout,
                                          shared_ptr<Messenger> messenger,
                                          rpc::CredentialsPolicy creds_policy)
     : Rpc(deadline, std::move(messenger)),
       user_cb_(std::move(user_cb)),
-      addrs_(std::move(addrs)),
+      addrs_with_names_(std::move(addrs_with_names)),
       rpc_timeout_(rpc_timeout),
       pending_responses_(0),
       completed_(false) {
   DCHECK(deadline.Initialized());
 
   // Using resize instead of reserve to explicitly initialized the values.
-  responses_.resize(addrs_.size());
+  responses_.resize(addrs_with_names_.size());
   mutable_retrier()->mutable_controller()->set_credentials_policy(creds_policy);
 }
 
@@ -220,12 +226,13 @@ ConnectToClusterRpc::~ConnectToClusterRpc() {
 }
 
 string ConnectToClusterRpc::ToString() const {
-  vector<string> sockaddr_str;
-  for (const Sockaddr& addr : addrs_) {
-    sockaddr_str.push_back(addr.ToString());
+  vector<string> addrs_str;
+  for (const auto& addr_with_name : addrs_with_names_) {
+    addrs_str.emplace_back(Substitute(
+        "$0:$1", addr_with_name.second, addr_with_name.first.port()));
   }
   return strings::Substitute("ConnectToClusterRpc(addrs: $0, num_attempts: $1)",
-                             JoinStrings(sockaddr_str, ","),
+                             JoinStrings(addrs_str, ","),
                              num_attempts());
 }
 
@@ -236,10 +243,10 @@ void ConnectToClusterRpc::SendRpc() {
                                                 rpc_deadline);
 
   std::lock_guard<simple_spinlock> l(lock_);
-  for (int i = 0; i < addrs_.size(); i++) {
+  for (int i = 0; i < addrs_with_names_.size(); i++) {
     ConnectToMasterRpc* rpc = new ConnectToMasterRpc(
         Bind(&ConnectToClusterRpc::SingleNodeCallback, this, i),
-        addrs_[i],
+        addrs_with_names_[i],
         actual_deadline,
         retrier().messenger(),
         retrier().controller().credentials_policy(),
@@ -279,7 +286,7 @@ void ConnectToClusterRpc::SendRpcCb(const Status& status) {
   // We are not retrying.
   undo_completed.cancel();
   if (leader_idx_ != -1) {
-    user_cb_(status, addrs_[leader_idx_], responses_[leader_idx_]);
+    user_cb_(status, addrs_with_names_[leader_idx_], responses_[leader_idx_]);
   } else {
     user_cb_(status, {}, {});
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/acc8fe47/src/kudu/client/master_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/master_rpc.h b/src/kudu/client/master_rpc.h
index b57d137..f002b61 100644
--- a/src/kudu/client/master_rpc.h
+++ b/src/kudu/client/master_rpc.h
@@ -19,8 +19,10 @@
 #pragma once
 
 #include <functional>
-#include <vector>
+#include <memory>
 #include <string>
+#include <utility>
+#include <vector>
 
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/master/master.pb.h"
@@ -67,7 +69,7 @@ class ConnectToClusterRpc : public rpc::Rpc,
  public:
   typedef std::function<void(
       const Status& status,
-      const Sockaddr& leader_master,
+      const std::pair<Sockaddr, std::string> leader_master,
       const master::ConnectToMasterResponsePB& connect_response)> LeaderCallback;
   // The host and port of the leader master server is stored in
   // 'leader_master', which must remain valid for the lifetime of this
@@ -77,7 +79,7 @@ class ConnectToClusterRpc : public rpc::Rpc,
   // until 'deadline' passes. Each RPC has 'rpc_timeout' time to complete
   // before it times out and may be retried if 'deadline' has not yet passed.
   ConnectToClusterRpc(LeaderCallback user_cb,
-                      std::vector<Sockaddr> addrs,
+                      std::vector<std::pair<Sockaddr, std::string>> addrs_with_names,
                       MonoTime deadline,
                       MonoDelta rpc_timeout,
                       std::shared_ptr<rpc::Messenger> messenger,
@@ -103,8 +105,8 @@ class ConnectToClusterRpc : public rpc::Rpc,
 
   const LeaderCallback user_cb_;
 
-  // The addresses of the masters.
-  const std::vector<Sockaddr> addrs_;
+  // The addresses of the masters, along with their original specified names.
+  const std::vector<std::pair<Sockaddr, std::string>> addrs_with_names_;
 
   // The amount of time alloted to each GetMasterRegistration RPC.
   const MonoDelta rpc_timeout_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/acc8fe47/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
index c968ea5..7a881a8 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -24,6 +24,7 @@
 #include <rapidjson/document.h>
 #include <string>
 #include <unordered_set>
+#include <utility>
 
 #include "kudu/client/client.h"
 #include "kudu/client/master_rpc.h"
@@ -62,6 +63,7 @@ using kudu::tserver::ListTabletsRequestPB;
 using kudu::tserver::ListTabletsResponsePB;
 using kudu::tserver::TabletServerServiceProxy;
 using rapidjson::Value;
+using std::pair;
 using std::string;
 using std::unique_ptr;
 using std::unordered_set;
@@ -492,23 +494,23 @@ Status ExternalMiniCluster::WaitForTabletsRunning(ExternalTabletServer*
ts,
 Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) {
   scoped_refptr<ConnectToClusterRpc> rpc;
   Synchronizer sync;
-  vector<Sockaddr> addrs;
+  vector<pair<Sockaddr, string>> addrs_with_names;
   Sockaddr leader_master_addr;
   MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
 
   for (const scoped_refptr<ExternalMaster>& master : masters_) {
-    addrs.push_back(master->bound_rpc_addr());
+    addrs_with_names.emplace_back(master->bound_rpc_addr(), master->bound_rpc_addr().host());
   }
   const auto& cb = [&](const Status& status,
-                       const Sockaddr& leader_master,
+                       const pair<Sockaddr, string>& leader_master,
                        const master::ConnectToMasterResponsePB& resp) {
     if (status.ok()) {
-      leader_master_addr = leader_master;
+      leader_master_addr = leader_master.first;
     }
     sync.StatusCB(status);
   };
   rpc.reset(new ConnectToClusterRpc(cb,
-                                    std::move(addrs),
+                                    std::move(addrs_with_names),
                                     deadline,
                                     MonoDelta::FromSeconds(5),
                                     messenger_));


Mime
View raw message