Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0151F200D15 for ; Thu, 5 Oct 2017 19:42:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 00442160BDA; Thu, 5 Oct 2017 17:42:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EB2C21609E2 for ; Thu, 5 Oct 2017 19:42:43 +0200 (CEST) Received: (qmail 96581 invoked by uid 500); 5 Oct 2017 17:42:43 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 96572 invoked by uid 99); 5 Oct 2017 17:42:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Oct 2017 17:42:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D3EB9F5C74; Thu, 5 Oct 2017 17:42:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: todd@apache.org To: commits@kudu.apache.org Date: Thu, 05 Oct 2017 17:42:43 -0000 Message-Id: <22823d12cfbd46db9754c16676f5b47f@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] kudu git commit: KUDU-2032 (part 2): propagate master hostnames into client archived-at: Thu, 05 Oct 2017 17:42:45 -0000 KUDU-2032 (part 2): propagate master hostnames into client This changes the client code to remember the user-specified master addresses and propagate them into the creation of master proxies. It's not possible to reproduce the necessary DNS configurations in a minicluster test, but with this patch I am now able to use 'kudu perf loadgen' against a Kerberized cluster even when my local krb5.conf has rdns=false. Change-Id: I4bd299f72907a0d9442dd3eb45fcbaa2de4f73ef Reviewed-on: http://gerrit.cloudera.org:8080/8186 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/acc8fe47 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/acc8fe47 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/acc8fe47 Branch: refs/heads/branch-1.4.x Commit: acc8fe4734b572764be867083b024733e8959d33 Parents: 0fd6b4e Author: Todd Lipcon Authored: Wed Aug 16 17:44:39 2017 -0700 Committer: Will Berkeley Committed: Thu Oct 5 05:35:08 2017 +0000 ---------------------------------------------------------------------- src/kudu/client/client-internal.cc | 27 +++++++----- src/kudu/client/client-internal.h | 3 +- src/kudu/client/master_rpc.cc | 43 ++++++++++++-------- src/kudu/client/master_rpc.h | 12 +++--- .../integration-tests/external_mini_cluster.cc | 12 +++--- 5 files changed, 58 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/acc8fe47/src/kudu/client/client-internal.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc index f3d3526..63535d4 100644 --- a/src/kudu/client/client-internal.cc +++ b/src/kudu/client/client-internal.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include "kudu/client/master_rpc.h" @@ -50,6 +51,7 @@ #include "kudu/util/pb_util.h" #include "kudu/util/thread_restrictions.h" +using std::pair; using std::set; using std::shared_ptr; using std::string; @@ -625,10 +627,13 @@ Status KuduClient::Data::GetTableSchema(KuduClient* client, void KuduClient::Data::ConnectedToClusterCb( const Status& status, - const Sockaddr& leader_addr, + const pair& leader_addr_and_name, const master::ConnectToMasterResponsePB& connect_response, CredentialsPolicy cred_policy) { + const auto& leader_addr = leader_addr_and_name.first; + const auto& leader_hostname = leader_addr_and_name.second; + // Ensure that all of the CAs reported by the master are trusted // in our local TLS configuration. if (status.ok()) { @@ -669,9 +674,8 @@ void KuduClient::Data::ConnectedToClusterCb( } if (status.ok()) { - leader_master_hostport_ = HostPort(leader_addr); - // TODO(KUDU-2032): retain the original hostname passed by caller. - master_proxy_.reset(new MasterServiceProxy(messenger_, leader_addr, leader_addr.host())); + leader_master_hostport_ = HostPort(leader_hostname, leader_addr.port()); + master_proxy_.reset(new MasterServiceProxy(messenger_, leader_addr, leader_hostname)); } } @@ -694,12 +698,15 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client, CredentialsPolicy creds_policy) { DCHECK(deadline.Initialized()); - vector master_sockaddrs; + vector> master_addrs_with_names; for (const string& master_server_addr : master_server_addrs_) { vector addrs; - Status s; - // TODO: Do address resolution asynchronously as well. - s = ParseAddressList(master_server_addr, master::Master::kDefaultPort, &addrs); + HostPort hp; + Status s = hp.ParseString(master_server_addr, master::Master::kDefaultPort); + if (s.ok()) { + // TODO(todd): Do address resolution asynchronously as well. + s = hp.ResolveAddresses(&addrs); + } if (!s.ok()) { cb.Run(s); return; @@ -714,7 +721,7 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client, << "Specified master server address '" << master_server_addr << "' " << "resolved to multiple IPs. Using " << addrs[0].ToString(); } - master_sockaddrs.push_back(addrs[0]); + master_addrs_with_names.emplace_back(addrs[0], hp.host()); } // This ensures that no more than one ConnectToClusterRpc of each credentials @@ -761,7 +768,7 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client, std::placeholders::_2, std::placeholders::_3, creds_policy), - std::move(master_sockaddrs), + std::move(master_addrs_with_names), deadline, client->default_rpc_timeout(), messenger_, http://git-wip-us.apache.org/repos/asf/kudu/blob/acc8fe47/src/kudu/client/client-internal.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h index a7a1434..0d75ed0 100644 --- a/src/kudu/client/client-internal.h +++ b/src/kudu/client/client-internal.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -139,7 +140,7 @@ class KuduClient::Data { // // See also: ConnectToClusterAsync. void ConnectedToClusterCb(const Status& status, - const Sockaddr& leader_addr, + const std::pair& leader_addr_and_name, const master::ConnectToMasterResponsePB& connect_response, rpc::CredentialsPolicy cred_policy); http://git-wip-us.apache.org/repos/asf/kudu/blob/acc8fe47/src/kudu/client/master_rpc.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/master_rpc.cc b/src/kudu/client/master_rpc.cc index 8d3049f..1c5f848 100644 --- a/src/kudu/client/master_rpc.cc +++ b/src/kudu/client/master_rpc.cc @@ -20,6 +20,8 @@ #include "kudu/client/master_rpc.h" #include +#include +#include #include @@ -34,6 +36,7 @@ #include "kudu/util/scoped_cleanup.h" +using std::pair; using std::shared_ptr; using std::string; using std::vector; @@ -48,6 +51,7 @@ using kudu::master::MasterServiceProxy; using kudu::rpc::CredentialsPolicy; using kudu::rpc::Messenger; using kudu::rpc::Rpc; +using strings::Substitute; namespace kudu { namespace client { @@ -68,7 +72,7 @@ class ConnectToMasterRpc : public rpc::Rpc { // // Invokes 'user_cb' upon failure or success of the RPC call. ConnectToMasterRpc(StatusCallback user_cb, - const Sockaddr& addr, + pair addr_with_name, const MonoTime& deadline, std::shared_ptr messenger, CredentialsPolicy creds_policy, @@ -84,7 +88,9 @@ class ConnectToMasterRpc : public rpc::Rpc { virtual void SendRpcCb(const Status& status) OVERRIDE; const StatusCallback user_cb_; - const Sockaddr addr_; + + // The resolved address to try to connect to, along with its original specified hostname. + const pair addr_with_name_; // Owned by the caller of this RPC, not this instance. ConnectToMasterResponsePB* out_; @@ -99,14 +105,14 @@ class ConnectToMasterRpc : public rpc::Rpc { ConnectToMasterRpc::ConnectToMasterRpc(StatusCallback user_cb, - const Sockaddr& addr, + pair addr_with_name, const MonoTime& deadline, shared_ptr messenger, rpc::CredentialsPolicy creds_policy, ConnectToMasterResponsePB* out) : Rpc(deadline, std::move(messenger)), user_cb_(std::move(user_cb)), - addr_(addr), + addr_with_name_(std::move(addr_with_name)), out_(DCHECK_NOTNULL(out)) { mutable_retrier()->mutable_controller()->set_credentials_policy(creds_policy); } @@ -115,8 +121,7 @@ ConnectToMasterRpc::~ConnectToMasterRpc() { } void ConnectToMasterRpc::SendRpc() { - // TODO(KUDU-2032): retain the hostname for addr_ - MasterServiceProxy proxy(retrier().messenger(), addr_, addr_.host()); + MasterServiceProxy proxy(retrier().messenger(), addr_with_name_.first, addr_with_name_.second); rpc::RpcController* controller = mutable_retrier()->mutable_controller(); // TODO(todd): should this be setting an RPC call deadline based on 'deadline'? // it doesn't seem to be. @@ -137,8 +142,9 @@ void ConnectToMasterRpc::SendRpc() { } string ConnectToMasterRpc::ToString() const { - return strings::Substitute("ConnectToMasterRpc(address: $0, num_attempts: $1)", - addr_.ToString(), num_attempts()); + return strings::Substitute("ConnectToMasterRpc(address: $0:$1, num_attempts: $2)", + addr_with_name_.second, addr_with_name_.first.port(), + num_attempts()); } void ConnectToMasterRpc::SendRpcCb(const Status& status) { @@ -198,21 +204,21 @@ void ConnectToMasterRpc::SendRpcCb(const Status& status) { //////////////////////////////////////////////////////////// ConnectToClusterRpc::ConnectToClusterRpc(LeaderCallback user_cb, - vector addrs, + vector> addrs_with_names, MonoTime deadline, MonoDelta rpc_timeout, shared_ptr messenger, rpc::CredentialsPolicy creds_policy) : Rpc(deadline, std::move(messenger)), user_cb_(std::move(user_cb)), - addrs_(std::move(addrs)), + addrs_with_names_(std::move(addrs_with_names)), rpc_timeout_(rpc_timeout), pending_responses_(0), completed_(false) { DCHECK(deadline.Initialized()); // Using resize instead of reserve to explicitly initialized the values. - responses_.resize(addrs_.size()); + responses_.resize(addrs_with_names_.size()); mutable_retrier()->mutable_controller()->set_credentials_policy(creds_policy); } @@ -220,12 +226,13 @@ ConnectToClusterRpc::~ConnectToClusterRpc() { } string ConnectToClusterRpc::ToString() const { - vector sockaddr_str; - for (const Sockaddr& addr : addrs_) { - sockaddr_str.push_back(addr.ToString()); + vector addrs_str; + for (const auto& addr_with_name : addrs_with_names_) { + addrs_str.emplace_back(Substitute( + "$0:$1", addr_with_name.second, addr_with_name.first.port())); } return strings::Substitute("ConnectToClusterRpc(addrs: $0, num_attempts: $1)", - JoinStrings(sockaddr_str, ","), + JoinStrings(addrs_str, ","), num_attempts()); } @@ -236,10 +243,10 @@ void ConnectToClusterRpc::SendRpc() { rpc_deadline); std::lock_guard l(lock_); - for (int i = 0; i < addrs_.size(); i++) { + for (int i = 0; i < addrs_with_names_.size(); i++) { ConnectToMasterRpc* rpc = new ConnectToMasterRpc( Bind(&ConnectToClusterRpc::SingleNodeCallback, this, i), - addrs_[i], + addrs_with_names_[i], actual_deadline, retrier().messenger(), retrier().controller().credentials_policy(), @@ -279,7 +286,7 @@ void ConnectToClusterRpc::SendRpcCb(const Status& status) { // We are not retrying. undo_completed.cancel(); if (leader_idx_ != -1) { - user_cb_(status, addrs_[leader_idx_], responses_[leader_idx_]); + user_cb_(status, addrs_with_names_[leader_idx_], responses_[leader_idx_]); } else { user_cb_(status, {}, {}); } http://git-wip-us.apache.org/repos/asf/kudu/blob/acc8fe47/src/kudu/client/master_rpc.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/master_rpc.h b/src/kudu/client/master_rpc.h index b57d137..f002b61 100644 --- a/src/kudu/client/master_rpc.h +++ b/src/kudu/client/master_rpc.h @@ -19,8 +19,10 @@ #pragma once #include -#include +#include #include +#include +#include #include "kudu/gutil/ref_counted.h" #include "kudu/master/master.pb.h" @@ -67,7 +69,7 @@ class ConnectToClusterRpc : public rpc::Rpc, public: typedef std::function leader_master, const master::ConnectToMasterResponsePB& connect_response)> LeaderCallback; // The host and port of the leader master server is stored in // 'leader_master', which must remain valid for the lifetime of this @@ -77,7 +79,7 @@ class ConnectToClusterRpc : public rpc::Rpc, // until 'deadline' passes. Each RPC has 'rpc_timeout' time to complete // before it times out and may be retried if 'deadline' has not yet passed. ConnectToClusterRpc(LeaderCallback user_cb, - std::vector addrs, + std::vector> addrs_with_names, MonoTime deadline, MonoDelta rpc_timeout, std::shared_ptr messenger, @@ -103,8 +105,8 @@ class ConnectToClusterRpc : public rpc::Rpc, const LeaderCallback user_cb_; - // The addresses of the masters. - const std::vector addrs_; + // The addresses of the masters, along with their original specified names. + const std::vector> addrs_with_names_; // The amount of time alloted to each GetMasterRegistration RPC. const MonoDelta rpc_timeout_; http://git-wip-us.apache.org/repos/asf/kudu/blob/acc8fe47/src/kudu/integration-tests/external_mini_cluster.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc index c968ea5..7a881a8 100644 --- a/src/kudu/integration-tests/external_mini_cluster.cc +++ b/src/kudu/integration-tests/external_mini_cluster.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include "kudu/client/client.h" #include "kudu/client/master_rpc.h" @@ -62,6 +63,7 @@ using kudu::tserver::ListTabletsRequestPB; using kudu::tserver::ListTabletsResponsePB; using kudu::tserver::TabletServerServiceProxy; using rapidjson::Value; +using std::pair; using std::string; using std::unique_ptr; using std::unordered_set; @@ -492,23 +494,23 @@ Status ExternalMiniCluster::WaitForTabletsRunning(ExternalTabletServer* ts, Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) { scoped_refptr rpc; Synchronizer sync; - vector addrs; + vector> addrs_with_names; Sockaddr leader_master_addr; MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5); for (const scoped_refptr& master : masters_) { - addrs.push_back(master->bound_rpc_addr()); + addrs_with_names.emplace_back(master->bound_rpc_addr(), master->bound_rpc_addr().host()); } const auto& cb = [&](const Status& status, - const Sockaddr& leader_master, + const pair& leader_master, const master::ConnectToMasterResponsePB& resp) { if (status.ok()) { - leader_master_addr = leader_master; + leader_master_addr = leader_master.first; } sync.StatusCB(status); }; rpc.reset(new ConnectToClusterRpc(cb, - std::move(addrs), + std::move(addrs_with_names), deadline, MonoDelta::FromSeconds(5), messenger_));