kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] branch master updated: KUDU-2791: TTL cache in DNS resolver (part 2)
Date Mon, 03 Jun 2019 04:41:45 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 48467cc  KUDU-2791: TTL cache in DNS resolver (part 2)
48467cc is described below

commit 48467ccf4c7a6135453708e3ea5123738fef19b3
Author: Alexey Serbin <alexey@apache.org>
AuthorDate: Wed May 29 23:58:20 2019 -0700

    KUDU-2791: TTL cache in DNS resolver (part 2)
    
    This changelist adds necessary plumbing in various places where
    it's beneficial to use caching DnsResolver instead of
    HostPort::ResolveAddresses().
    
    Change-Id: Ib22e87d28573fdb93ba18cd6d99d8bde7524a4fc
    Reviewed-on: http://gerrit.cloudera.org:8080/13469
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <adar@cloudera.com>
---
 src/kudu/client/client-internal.cc                 |  7 +--
 src/kudu/consensus/consensus_peers.cc              | 38 ++++++++------
 src/kudu/consensus/consensus_peers.h               | 14 +++--
 src/kudu/integration-tests/raft_consensus-itest.cc |  5 +-
 src/kudu/kserver/kserver.cc                        |  4 +-
 src/kudu/master/master_service.cc                  |  1 +
 src/kudu/master/sys_catalog.cc                     | 11 ++--
 src/kudu/master/ts_descriptor-test.cc              | 14 ++---
 src/kudu/master/ts_descriptor.cc                   | 13 +++--
 src/kudu/master/ts_descriptor.h                    |  6 ++-
 src/kudu/master/ts_manager.cc                      |  6 ++-
 src/kudu/master/ts_manager.h                       |  2 +
 src/kudu/server/server_base.cc                     |  8 +++
 src/kudu/server/server_base.h                      | 11 ++--
 src/kudu/tablet/tablet_replica-test.cc             | 61 ++++++++++++----------
 src/kudu/tablet/tablet_replica.cc                  |  5 +-
 src/kudu/tablet/tablet_replica.h                   |  4 +-
 src/kudu/tserver/heartbeater.cc                    | 45 +++++++---------
 .../tserver/tablet_copy_source_session-test.cc     | 10 ++--
 src/kudu/tserver/tablet_server.cc                  | 26 +++------
 src/kudu/tserver/ts_tablet_manager.cc              |  3 +-
 src/kudu/util/net/net_util.cc                      |  2 +-
 22 files changed, 169 insertions(+), 127 deletions(-)

diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 842ae8f..09e42bb 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -415,8 +415,9 @@ Status KuduClient::Data::InitLocalHostNames() {
   }
 
   vector<Sockaddr> addresses;
-  RETURN_NOT_OK_PREPEND(HostPort(hostname, 0).ResolveAddresses(&addresses),
-                        Substitute("Could not resolve local host name '$0'", hostname));
+  RETURN_NOT_OK_PREPEND(dns_resolver_->ResolveAddresses(HostPort(hostname, 0),
+                                                        &addresses),
+      Substitute("Could not resolve local host name '$0'", hostname));
 
   for (const Sockaddr& addr : addresses) {
     // Similar to above, ignore local or wildcard addresses.
@@ -592,7 +593,7 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
     Status s = hp.ParseString(master_server_addr, master::Master::kDefaultPort);
     if (s.ok()) {
       // TODO(todd): Do address resolution asynchronously as well.
-      s = hp.ResolveAddresses(&addrs);
+      s = dns_resolver_->ResolveAddresses(hp, &addrs);
     }
     if (!s.ok()) {
       cb.Run(s);
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index e3a30be..e6fd67a 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -50,6 +50,7 @@
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
@@ -536,15 +537,17 @@ string RpcPeerProxy::PeerName() const {
 
 namespace {
 
-Status CreateConsensusServiceProxyForHost(const shared_ptr<Messenger>& messenger,
-                                          const HostPort& hostport,
-                                          gscoped_ptr<ConsensusServiceProxy>* new_proxy)
{
+Status CreateConsensusServiceProxyForHost(
+    const HostPort& hostport,
+    const shared_ptr<Messenger>& messenger,
+    DnsResolver* dns_resolver,
+    gscoped_ptr<ConsensusServiceProxy>* new_proxy) {
   vector<Sockaddr> addrs;
-  RETURN_NOT_OK(hostport.ResolveAddresses(&addrs));
+  RETURN_NOT_OK(dns_resolver->ResolveAddresses(hostport, &addrs));
   if (addrs.size() > 1) {
-    LOG(WARNING)<< "Peer address '" << hostport.ToString() << "' "
-    << "resolves to " << addrs.size() << " different addresses. Using "
-    << addrs[0].ToString();
+    LOG(WARNING) << Substitute(
+        "Peer address '$0' resolves to $1 different addresses. "
+        "Using $2", hostport.ToString(), addrs.size(), addrs[0].ToString());
   }
   new_proxy->reset(new ConsensusServiceProxy(messenger, addrs[0], hostport.host()));
   return Status::OK();
@@ -552,28 +555,33 @@ Status CreateConsensusServiceProxyForHost(const shared_ptr<Messenger>&
messenger
 
 } // anonymous namespace
 
-RpcPeerProxyFactory::RpcPeerProxyFactory(shared_ptr<Messenger> messenger)
-    : messenger_(std::move(messenger)) {}
+RpcPeerProxyFactory::RpcPeerProxyFactory(shared_ptr<Messenger> messenger,
+                                         DnsResolver* dns_resolver)
+    : messenger_(std::move(messenger)),
+      dns_resolver_(dns_resolver) {
+}
 
 Status RpcPeerProxyFactory::NewProxy(const RaftPeerPB& peer_pb,
                                      gscoped_ptr<PeerProxy>* proxy) {
   gscoped_ptr<HostPort> hostport(new HostPort);
   RETURN_NOT_OK(HostPortFromPB(peer_pb.last_known_addr(), hostport.get()));
   gscoped_ptr<ConsensusServiceProxy> new_proxy;
-  RETURN_NOT_OK(CreateConsensusServiceProxyForHost(messenger_, *hostport, &new_proxy));
+  RETURN_NOT_OK(CreateConsensusServiceProxyForHost(
+      *hostport, messenger_, dns_resolver_, &new_proxy));
   proxy->reset(new RpcPeerProxy(std::move(hostport), std::move(new_proxy)));
   return Status::OK();
 }
 
-RpcPeerProxyFactory::~RpcPeerProxyFactory() {}
-
-Status SetPermanentUuidForRemotePeer(const shared_ptr<Messenger>& messenger,
-                                     RaftPeerPB* remote_peer) {
+Status SetPermanentUuidForRemotePeer(
+    const shared_ptr<rpc::Messenger>& messenger,
+    DnsResolver* resolver,
+    RaftPeerPB* remote_peer) {
   DCHECK(!remote_peer->has_permanent_uuid());
   HostPort hostport;
   RETURN_NOT_OK(HostPortFromPB(remote_peer->last_known_addr(), &hostport));
   gscoped_ptr<ConsensusServiceProxy> proxy;
-  RETURN_NOT_OK(CreateConsensusServiceProxyForHost(messenger, hostport, &proxy));
+  RETURN_NOT_OK(CreateConsensusServiceProxyForHost(
+      hostport, messenger, resolver, &proxy));
   GetNodeInstanceRequestPB req;
   GetNodeInstanceResponsePB resp;
   rpc::RpcController controller;
diff --git a/src/kudu/consensus/consensus_peers.h b/src/kudu/consensus/consensus_peers.h
index 3bcdcb5..7379512 100644
--- a/src/kudu/consensus/consensus_peers.h
+++ b/src/kudu/consensus/consensus_peers.h
@@ -38,6 +38,7 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
+class DnsResolver;
 class ThreadPoolToken;
 
 namespace rpc {
@@ -278,26 +279,29 @@ class RpcPeerProxy : public PeerProxy {
 // PeerProxyFactory implementation that generates RPCPeerProxies
 class RpcPeerProxyFactory : public PeerProxyFactory {
  public:
-  explicit RpcPeerProxyFactory(std::shared_ptr<rpc::Messenger> messenger);
+  RpcPeerProxyFactory(std::shared_ptr<rpc::Messenger> messenger,
+                      DnsResolver* dns_resolver);
+  ~RpcPeerProxyFactory() = default;
 
   Status NewProxy(const RaftPeerPB& peer_pb,
                   gscoped_ptr<PeerProxy>* proxy) override;
 
-  ~RpcPeerProxyFactory();
-
   const std::shared_ptr<rpc::Messenger>& messenger() const override {
     return messenger_;
   }
 
  private:
   std::shared_ptr<rpc::Messenger> messenger_;
+  DnsResolver* dns_resolver_;
 };
 
 // Query the consensus service at last known host/port that is
 // specified in 'remote_peer' and set the 'permanent_uuid' field based
 // on the response.
-Status SetPermanentUuidForRemotePeer(const std::shared_ptr<rpc::Messenger>& messenger,
-                                     RaftPeerPB* remote_peer);
+Status SetPermanentUuidForRemotePeer(
+    const std::shared_ptr<rpc::Messenger>& messenger,
+    DnsResolver* resolver,
+    RaftPeerPB* remote_peer);
 
 }  // namespace consensus
 }  // namespace kudu
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 6593f2b..5068e9c 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -74,6 +74,7 @@
 #include "kudu/util/env_util.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/pb_util.h"
@@ -654,7 +655,9 @@ TEST_F(RaftConsensusITest, TestGetPermanentUuid) {
   std::shared_ptr<rpc::Messenger> messenger;
   ASSERT_OK(builder.Build(&messenger));
 
-  ASSERT_OK(consensus::SetPermanentUuidForRemotePeer(messenger, &peer));
+  auto resolver = std::make_shared<DnsResolver>();
+  ASSERT_OK(consensus::SetPermanentUuidForRemotePeer(
+      messenger, resolver.get(), &peer));
   ASSERT_EQ(expected_uuid, peer.permanent_uuid());
 }
 
diff --git a/src/kudu/kserver/kserver.cc b/src/kudu/kserver/kserver.cc
index 512a7fb..985e777 100644
--- a/src/kudu/kserver/kserver.cc
+++ b/src/kudu/kserver/kserver.cc
@@ -55,13 +55,11 @@ static bool ValidateThreadPoolThreadLimit(const char* /*flagname*/, int32_t
valu
 }
 DEFINE_validator(server_thread_pool_max_thread_count, &ValidateThreadPoolThreadLimit);
 
+using kudu::server::ServerBaseOptions;
 using std::string;
 using strings::Substitute;
 
 namespace kudu {
-
-using server::ServerBaseOptions;
-
 namespace kserver {
 
 METRIC_DEFINE_histogram(server, op_apply_queue_length, "Operation Apply Queue Length",
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index fadfae4..eaf5364 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -215,6 +215,7 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
     }
     Status s = server_->ts_manager()->RegisterTS(req->common().ts_instance(),
                                                  req->registration(),
+                                                 server_->dns_resolver(),
                                                  &ts_desc);
     if (!s.ok()) {
       LOG(WARNING) << Substitute("Unable to register tserver ($0): $1",
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 4918f11..e90531f 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -304,10 +304,10 @@ Status SysCatalogTable::CreateDistributedConfig(const MasterOptions&
options,
       LOG(INFO) << SecureShortDebugString(peer)
                 << " has no permanent_uuid. Determining permanent_uuid...";
       RaftPeerPB new_peer = peer;
-      RETURN_NOT_OK_PREPEND(consensus::SetPermanentUuidForRemotePeer(master_->messenger(),
-                                                                     &new_peer),
-                            Substitute("Unable to resolve UUID for peer $0",
-                                       SecureShortDebugString(peer)));
+      RETURN_NOT_OK_PREPEND(consensus::SetPermanentUuidForRemotePeer(
+          master_->messenger(), master_->dns_resolver(), &new_peer),
+          Substitute("Unable to resolve UUID for peer $0",
+                     SecureShortDebugString(peer)));
       resolved_config.add_peers()->CopyFrom(new_peer);
     }
   }
@@ -396,7 +396,8 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
                                                master_->messenger(),
                                                scoped_refptr<rpc::ResultTracker>(),
                                                log,
-                                               master_->tablet_prepare_pool()),
+                                               master_->tablet_prepare_pool(),
+                                               master_->dns_resolver()),
                         "Failed to Start() TabletReplica");
 
   tablet_replica_->RegisterMaintenanceOps(master_->maintenance_manager());
diff --git a/src/kudu/master/ts_descriptor-test.cc b/src/kudu/master/ts_descriptor-test.cc
index b2caf50..dc8acc2 100644
--- a/src/kudu/master/ts_descriptor-test.cc
+++ b/src/kudu/master/ts_descriptor-test.cc
@@ -19,7 +19,6 @@
 
 #include <memory>
 #include <string>
-#include <vector>
 
 #include <boost/optional/optional.hpp>
 #include <glog/logging.h>
@@ -27,13 +26,12 @@
 
 #include "kudu/common/common.pb.h"
 #include "kudu/common/wire_protocol.pb.h"
-#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/test_macros.h"
 
 using std::shared_ptr;
 using std::string;
-using std::vector;
-using strings::Substitute;
+using std::unique_ptr;
 
 namespace kudu {
 namespace master {
@@ -69,8 +67,10 @@ TEST(TSDescriptorTest, TestRegistration) {
   NodeInstancePB instance;
   ServerRegistrationPB registration;
   SetupBasicRegistrationInfo(uuid, &instance, &registration);
+  unique_ptr<DnsResolver> dns_resolver(new DnsResolver);
   shared_ptr<TSDescriptor> desc;
-  ASSERT_OK(TSDescriptor::RegisterNew(instance, registration, {}, &desc));
+  ASSERT_OK(TSDescriptor::RegisterNew(
+      instance, registration, {}, dns_resolver.get(), &desc));
 
   // Spot check some fields and the ToString value.
   ASSERT_EQ(uuid, desc->permanent_uuid());
@@ -87,8 +87,10 @@ TEST(TSDescriptorTest, TestLocationCmd) {
   NodeInstancePB instance;
   ServerRegistrationPB registration;
   SetupBasicRegistrationInfo(uuid, &instance, &registration);
+  unique_ptr<DnsResolver> dns_resolver(new DnsResolver);
   shared_ptr<TSDescriptor> desc;
-  ASSERT_OK(TSDescriptor::RegisterNew(instance, registration, location, &desc));
+  ASSERT_OK(TSDescriptor::RegisterNew(
+      instance, registration, location, dns_resolver.get(), &desc));
   ASSERT_EQ(location, desc->location());
 }
 } // namespace master
diff --git a/src/kudu/master/ts_descriptor.cc b/src/kudu/master/ts_descriptor.cc
index 5912faa..4c1c0a1 100644
--- a/src/kudu/master/ts_descriptor.cc
+++ b/src/kudu/master/ts_descriptor.cc
@@ -35,6 +35,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
@@ -58,10 +59,12 @@ namespace master {
 Status TSDescriptor::RegisterNew(const NodeInstancePB& instance,
                                  const ServerRegistrationPB& registration,
                                  const boost::optional<std::string>& location,
+                                 DnsResolver* dns_resolver,
                                  shared_ptr<TSDescriptor>* desc) {
   shared_ptr<TSDescriptor> ret(TSDescriptor::make_shared(instance.permanent_uuid()));
-  RETURN_NOT_OK(ret->Register(instance, registration, location));
-  desc->swap(ret);
+  RETURN_NOT_OK(ret->Register(
+      instance, registration, location, dns_resolver));
+  *desc = std::move(ret);
   return Status::OK();
 }
 
@@ -97,7 +100,8 @@ static bool HostPortPBsEqual(const google::protobuf::RepeatedPtrField<HostPortPB
 
 Status TSDescriptor::Register(const NodeInstancePB& instance,
                               const ServerRegistrationPB& registration,
-                              const boost::optional<std::string>& location) {
+                              const boost::optional<std::string>& location,
+                              DnsResolver* dns_resolver) {
   std::lock_guard<rw_spinlock> l(lock_);
   CHECK_EQ(instance.permanent_uuid(), permanent_uuid_);
 
@@ -138,6 +142,7 @@ Status TSDescriptor::Register(const NodeInstancePB& instance,
   registration_.reset(new ServerRegistrationPB(registration));
   ts_admin_proxy_.reset();
   consensus_proxy_.reset();
+  dns_resolver_ = dns_resolver;
   location_ = location;
   return Status::OK();
 }
@@ -217,7 +222,7 @@ Status TSDescriptor::ResolveSockaddr(Sockaddr* addr, string* host) const
{
   HostPort last_hostport;
   vector<Sockaddr> addrs;
   for (const HostPort& hostport : hostports) {
-    RETURN_NOT_OK(hostport.ResolveAddresses(&addrs));
+    RETURN_NOT_OK(dns_resolver_->ResolveAddresses(hostport, &addrs));
     if (!addrs.empty()) {
       last_hostport = hostport;
       break;
diff --git a/src/kudu/master/ts_descriptor.h b/src/kudu/master/ts_descriptor.h
index ab58d33..0b64019 100644
--- a/src/kudu/master/ts_descriptor.h
+++ b/src/kudu/master/ts_descriptor.h
@@ -37,6 +37,7 @@
 
 namespace kudu {
 
+class DnsResolver;
 class Sockaddr;
 
 namespace consensus {
@@ -62,6 +63,7 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
   static Status RegisterNew(const NodeInstancePB& instance,
                             const ServerRegistrationPB& registration,
                             const boost::optional<std::string>& location,
+                            DnsResolver* dns_resolver,
                             std::shared_ptr<TSDescriptor>* desc);
 
   virtual ~TSDescriptor() = default;
@@ -79,7 +81,8 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
   // Register this tablet server.
   Status Register(const NodeInstancePB& instance,
                   const ServerRegistrationPB& registration,
-                  const boost::optional<std::string>& location);
+                  const boost::optional<std::string>& location,
+                  DnsResolver* dns_resolver);
 
   const std::string &permanent_uuid() const { return permanent_uuid_; }
   int64_t latest_seqno() const;
@@ -171,6 +174,7 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
 
   std::shared_ptr<tserver::TabletServerAdminServiceProxy> ts_admin_proxy_;
   std::shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy_;
+  DnsResolver* dns_resolver_;
 
   DISALLOW_COPY_AND_ASSIGN(TSDescriptor);
 };
diff --git a/src/kudu/master/ts_manager.cc b/src/kudu/master/ts_manager.cc
index 92a9b09..de4a1ac 100644
--- a/src/kudu/master/ts_manager.cc
+++ b/src/kudu/master/ts_manager.cc
@@ -102,6 +102,7 @@ bool TSManager::LookupTSByUUID(const string& uuid,
 
 Status TSManager::RegisterTS(const NodeInstancePB& instance,
                              const ServerRegistrationPB& registration,
+                             DnsResolver* dns_resolver,
                              std::shared_ptr<TSDescriptor>* desc) {
   // Pre-condition: registration info should contain at least one RPC end-point.
   if (registration.rpc_addresses().empty()) {
@@ -148,10 +149,11 @@ Status TSManager::RegisterTS(const NodeInstancePB& instance,
     auto* descriptor_ptr = FindOrNull(servers_by_id_, uuid);
     if (descriptor_ptr) {
       descriptor = *descriptor_ptr;
-      RETURN_NOT_OK(descriptor->Register(instance, registration, location));
+      RETURN_NOT_OK(descriptor->Register(
+          instance, registration, location, dns_resolver));
     } else {
       RETURN_NOT_OK(TSDescriptor::RegisterNew(
-          instance, registration, location, &descriptor));
+          instance, registration, location, dns_resolver, &descriptor));
       InsertOrDie(&servers_by_id_, uuid, descriptor);
       new_tserver = true;
     }
diff --git a/src/kudu/master/ts_manager.h b/src/kudu/master/ts_manager.h
index bf52a9c..359432a 100644
--- a/src/kudu/master/ts_manager.h
+++ b/src/kudu/master/ts_manager.h
@@ -30,6 +30,7 @@
 
 namespace kudu {
 
+class DnsResolver;
 class NodeInstancePB;
 class ServerRegistrationPB;
 
@@ -74,6 +75,7 @@ class TSManager {
   // If successful, *desc reset to the registered descriptor.
   Status RegisterTS(const NodeInstancePB& instance,
                     const ServerRegistrationPB& registration,
+                    DnsResolver* dns_resolver,
                     std::shared_ptr<TSDescriptor>* desc);
 
   // Return all of the currently registered TS descriptors into the provided
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 8894373..7a57f33 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -74,6 +74,7 @@
 #include "kudu/util/metrics.h"
 #include "kudu/util/minidump.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
@@ -206,6 +207,9 @@ DEFINE_int32(rpc_default_keepalive_time_ms, 65000,
 TAG_FLAG(rpc_default_keepalive_time_ms, advanced);
 
 DECLARE_bool(use_hybrid_clock);
+DECLARE_int32(dns_resolver_max_threads_num);
+DECLARE_uint32(dns_resolver_cache_capacity_mb);
+DECLARE_uint32(dns_resolver_cache_ttl_sec);
 
 using kudu::security::RpcAuthentication;
 using kudu::security::RpcEncryption;
@@ -355,6 +359,10 @@ ServerBase::ServerBase(string name, const ServerBaseOptions& options,
       result_tracker_(new rpc::ResultTracker(shared_ptr<MemTracker>(
           MemTracker::CreateTracker(-1, "result-tracker", mem_tracker_)))),
       is_first_run_(false),
+      dns_resolver_(new DnsResolver(
+          FLAGS_dns_resolver_max_threads_num,
+          FLAGS_dns_resolver_cache_capacity_mb * 1024 * 1024,
+          MonoDelta::FromSeconds(FLAGS_dns_resolver_cache_ttl_sec))),
       options_(options),
       stop_background_threads_latch_(1) {
   FsManagerOpts fs_opts;
diff --git a/src/kudu/server/server_base.h b/src/kudu/server/server_base.h
index 203dd99..a2aee01 100644
--- a/src/kudu/server/server_base.h
+++ b/src/kudu/server/server_base.h
@@ -14,8 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_SERVER_SERVER_BASE_H
-#define KUDU_SERVER_SERVER_BASE_H
+#pragma once
 
 #include <cstdint>
 #include <memory>
@@ -32,6 +31,7 @@
 
 namespace kudu {
 
+class DnsResolver;
 class FsManager;
 class MemTracker;
 class MetricEntity;
@@ -104,6 +104,8 @@ class ServerBase {
   // Returns this server's clock.
   clock::Clock* clock() { return clock_.get(); }
 
+  DnsResolver* dns_resolver() { return dns_resolver_.get(); }
+
   // Return a PB describing the status of the server (version info, bound ports, etc)
   Status GetStatusPB(ServerStatusPB* status) const;
 
@@ -193,6 +195,7 @@ class ServerBase {
 
   // The ACL of users who may act as part of the Kudu service.
   security::SimpleAcl service_acl_;
+
  private:
   Status InitAcls();
   void GenerateInstanceID();
@@ -209,6 +212,9 @@ class ServerBase {
   Status StartExcessLogFileDeleterThread();
   void ExcessLogFileDeleterThread();
 
+  // Utility object for DNS name resolutions.
+  std::unique_ptr<DnsResolver> dns_resolver_;
+
   ServerBaseOptions options_;
 
   std::unique_ptr<DiagnosticsLog> diag_log_;
@@ -222,4 +228,3 @@ class ServerBase {
 
 } // namespace server
 } // namespace kudu
-#endif /* KUDU_SERVER_SERVER_BASE_H */
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index a9f7382..9cf08d7 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -71,6 +71,7 @@
 #include "kudu/util/maintenance_manager.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -81,35 +82,35 @@ METRIC_DECLARE_entity(tablet);
 
 DECLARE_int32(flush_threshold_mb);
 
+using kudu::consensus::CommitMsg;
+using kudu::consensus::ConsensusBootstrapInfo;
+using kudu::consensus::ConsensusMetadata;
+using kudu::consensus::ConsensusMetadataManager;
+using kudu::consensus::OpId;
+using kudu::consensus::RECEIVED_OPID;
+using kudu::consensus::RaftConfigPB;
+using kudu::consensus::RaftConsensus;
+using kudu::consensus::RaftPeerPB;
+using kudu::log::Log;
+using kudu::log::LogOptions;
+using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::Messenger;
+using kudu::rpc::ResultTracker;
+using kudu::tserver::AlterSchemaRequestPB;
+using kudu::tserver::AlterSchemaResponsePB;
+using kudu::tserver::WriteRequestPB;
+using kudu::tserver::WriteResponsePB;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+
 namespace kudu {
 
 class MemTracker;
 
 namespace tablet {
 
-using consensus::CommitMsg;
-using consensus::ConsensusBootstrapInfo;
-using consensus::ConsensusMetadata;
-using consensus::ConsensusMetadataManager;
-using consensus::OpId;
-using consensus::RECEIVED_OPID;
-using consensus::RaftConfigPB;
-using consensus::RaftConsensus;
-using consensus::RaftPeerPB;
-using log::Log;
-using log::LogOptions;
-using pb_util::SecureDebugString;
-using pb_util::SecureShortDebugString;
-using rpc::Messenger;
-using rpc::ResultTracker;
-using std::shared_ptr;
-using std::string;
-using std::unique_ptr;
-using tserver::AlterSchemaRequestPB;
-using tserver::AlterSchemaResponsePB;
-using tserver::WriteRequestPB;
-using tserver::WriteResponsePB;
-
 static Schema GetTestSchema() {
   return Schema({ ColumnSchema("key", INT32) }, 1);
 }
@@ -117,9 +118,10 @@ static Schema GetTestSchema() {
 class TabletReplicaTest : public KuduTabletTest {
  public:
   TabletReplicaTest()
-    : KuduTabletTest(GetTestSchema()),
-      insert_counter_(0),
-      delete_counter_(0) {
+      : KuduTabletTest(GetTestSchema()),
+        insert_counter_(0),
+        delete_counter_(0),
+        dns_resolver_(new DnsResolver) {
   }
 
   void SetUpReplica(bool new_replica = true) {
@@ -184,7 +186,8 @@ class TabletReplicaTest : public KuduTabletTest {
                                   messenger_,
                                   scoped_refptr<ResultTracker>(),
                                   log,
-                                  prepare_pool_.get());
+                                  prepare_pool_.get(),
+                                  dns_resolver_.get());
   }
 
   Status StartReplicaAndWaitUntilLeader(const ConsensusBootstrapInfo& info) {
@@ -232,7 +235,8 @@ class TabletReplicaTest : public KuduTabletTest {
                                      messenger_,
                                      scoped_refptr<ResultTracker>(),
                                      log,
-                                     prepare_pool_.get()));
+                                     prepare_pool_.get(),
+                                     dns_resolver_.get()));
     // Wait for the replica to be usable.
     const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
     ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(kTimeout));
@@ -383,6 +387,7 @@ class TabletReplicaTest : public KuduTabletTest {
   gscoped_ptr<ThreadPool> prepare_pool_;
   gscoped_ptr<ThreadPool> apply_pool_;
   gscoped_ptr<ThreadPool> raft_pool_;
+  unique_ptr<DnsResolver> dns_resolver_;
 
   scoped_refptr<ConsensusMetadataManager> cmeta_manager_;
 
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index a284215..f9c4000 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -164,7 +164,8 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
                             shared_ptr<Messenger> messenger,
                             scoped_refptr<ResultTracker> result_tracker,
                             scoped_refptr<Log> log,
-                            ThreadPool* prepare_pool) {
+                            ThreadPool* prepare_pool,
+                            DnsResolver* resolver) {
   DCHECK(tablet) << "A TabletReplica must be provided with a Tablet";
   DCHECK(log) << "A TabletReplica must be provided with a Log";
 
@@ -210,7 +211,7 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
       VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid()
<< ": Peer starting";
       VLOG(2) << "RaftConfig before starting: " << SecureDebugString(consensus_->CommittedConfig());
 
-      peer_proxy_factory.reset(new RpcPeerProxyFactory(messenger_));
+      peer_proxy_factory.reset(new RpcPeerProxyFactory(messenger_, resolver));
       time_manager.reset(new TimeManager(clock_, tablet_->mvcc_manager()->GetCleanTimestamp()));
     }
 
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 50fab2d..1bcffa7 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -47,6 +47,7 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
+class DnsResolver;
 class MaintenanceManager;
 class MaintenanceOp;
 class MonoDelta;
@@ -108,7 +109,8 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
                std::shared_ptr<rpc::Messenger> messenger,
                scoped_refptr<rpc::ResultTracker> result_tracker,
                scoped_refptr<log::Log> log,
-               ThreadPool* prepare_pool);
+               ThreadPool* prepare_pool,
+               DnsResolver* resolver);
 
   // Synchronously transition this replica to STOPPED state from any other
   // state. This also stops RaftConsensus. If a Stop() operation is already in
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index 62856fd..b21a002 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -59,6 +59,7 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/mutex.h"
+#include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
@@ -116,31 +117,8 @@ using strings::Substitute;
 
 namespace kudu {
 
-namespace rpc {
-class Messenger;
-}
-
 namespace tserver {
 
-namespace {
-
-// Creates a proxy to 'hostport'.
-Status MasterServiceProxyForHostPort(const HostPort& hostport,
-                                     const shared_ptr<rpc::Messenger>& messenger,
-                                     gscoped_ptr<MasterServiceProxy>* proxy) {
-  vector<Sockaddr> addrs;
-  RETURN_NOT_OK(hostport.ResolveAddresses(&addrs));
-  if (addrs.size() > 1) {
-    LOG(WARNING) << "Master address '" << hostport.ToString() << "' "
-                 << "resolves to " << addrs.size() << " different addresses.
Using "
-                 << addrs[0].ToString();
-  }
-  proxy->reset(new MasterServiceProxy(messenger, addrs[0], hostport.host()));
-  return Status::OK();
-}
-
-} // anonymous namespace
-
 // Most of the actual logic of the heartbeater is inside this inner class,
 // to avoid having too many dependencies from the header itself.
 //
@@ -170,6 +148,8 @@ class Heartbeater::Thread {
   Status SetupRegistration(ServerRegistrationPB* reg);
   void SetupCommonField(master::TSToMasterCommonPB* common);
   bool IsCurrentThread() const;
+  // Creates a proxy to 'hostport'.
+  Status MasterServiceProxyForHostPort(gscoped_ptr<MasterServiceProxy>* proxy);
 
   // The host and port of the master that this thread will heartbeat to.
   //
@@ -333,8 +313,7 @@ Heartbeater::Thread::Thread(HostPort master_address, TabletServer* server)
 
 Status Heartbeater::Thread::ConnectToMaster() {
   gscoped_ptr<MasterServiceProxy> new_proxy;
-  RETURN_NOT_OK(MasterServiceProxyForHostPort(master_address_, server_->messenger(), &new_proxy));
-
+  RETURN_NOT_OK(MasterServiceProxyForHostPort(&new_proxy));
   // Ping the master to verify that it's alive.
   master::PingRequestPB req;
   master::PingResponsePB resp;
@@ -727,5 +706,21 @@ void Heartbeater::Thread::GenerateFullTabletReport(TabletReportPB* report)
{
   server_->tablet_manager()->PopulateFullTabletReport(report);
 }
 
+Status Heartbeater::Thread::MasterServiceProxyForHostPort(
+    gscoped_ptr<MasterServiceProxy>* proxy) {
+  vector<Sockaddr> addrs;
+  RETURN_NOT_OK(server_->dns_resolver()->ResolveAddresses(master_address_,
+                                                          &addrs));
+  CHECK(!addrs.empty());
+  if (addrs.size() > 1) {
+    LOG(WARNING) << Substitute(
+        "Master address '$0' resolves to $1 different addresses. Using $2",
+        master_address_.ToString(), addrs.size(), addrs[0].ToString());
+  }
+  proxy->reset(new MasterServiceProxy(
+      server_->messenger(), addrs[0], master_address_.host()));
+  return Status::OK();
+}
+
 } // namespace tserver
 } // namespace kudu
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 0633312..fa18b0a 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -66,6 +66,7 @@
 #include "kudu/util/faststring.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/slice.h"
@@ -109,8 +110,9 @@ using tablet::WriteTransactionState;
 class TabletCopyTest : public KuduTabletTest {
  public:
   TabletCopyTest()
-    : KuduTabletTest(Schema({ ColumnSchema("key", STRING),
-                              ColumnSchema("val", INT32) }, 1)) {
+      : KuduTabletTest(Schema({ ColumnSchema("key", STRING),
+                                ColumnSchema("val", INT32) }, 1)),
+        dns_resolver_(new DnsResolver) {
     CHECK_OK(ThreadPoolBuilder("prepare").Build(&prepare_pool_));
     CHECK_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
     CHECK_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
@@ -177,7 +179,8 @@ class TabletCopyTest : public KuduTabletTest {
                                      messenger,
                                      scoped_refptr<rpc::ResultTracker>(),
                                      log,
-                                     prepare_pool_.get()));
+                                     prepare_pool_.get(),
+                                     dns_resolver_.get()));
     ASSERT_OK(tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
     ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
   }
@@ -258,6 +261,7 @@ class TabletCopyTest : public KuduTabletTest {
   gscoped_ptr<ThreadPool> prepare_pool_;
   gscoped_ptr<ThreadPool> apply_pool_;
   gscoped_ptr<ThreadPool> raft_pool_;
+  unique_ptr<DnsResolver> dns_resolver_;
   scoped_refptr<TabletReplica> tablet_replica_;
   scoped_refptr<TabletCopySourceSession> session_;
 };
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index 5924c17..b7a5356 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -38,6 +38,7 @@
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/tserver/tserver_path_handlers.h"
 #include "kudu/util/maintenance_manager.h"
+#include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/status.h"
 
@@ -47,21 +48,6 @@ using kudu::fs::ErrorHandlerType;
 using kudu::rpc::ServiceIf;
 
 namespace kudu {
-
-namespace {
-
-Status ValidateMasterAddressResolution(const UnorderedHostPortSet& master_addrs) {
-  for (const HostPort& addr : master_addrs) {
-    RETURN_NOT_OK_PREPEND(addr.ResolveAddresses(nullptr),
-                          strings::Substitute(
-                          "Couldn't resolve master service address '$0'",
-                          addr.ToString()));
-  }
-  return Status::OK();
-}
-
-} // anonymous namespace
-
 namespace tserver {
 
 TabletServer::TabletServer(const TabletServerOptions& opts)
@@ -105,15 +91,19 @@ Status TabletServer::Init() {
   // We don't validate that we can connect at this point -- it should
   // be allowed to start the TS and the master in whichever order --
   // our heartbeat thread will loop until successfully connecting.
-  RETURN_NOT_OK(ValidateMasterAddressResolution(master_addrs));
+  for (const auto& addr : master_addrs) {
+    RETURN_NOT_OK_PREPEND(dns_resolver()->ResolveAddresses(addr, nullptr),
+        strings::Substitute("couldn't resolve master service address '$0'",
+                            addr.ToString()));
+  }
 
   RETURN_NOT_OK(KuduServer::Init());
   if (web_server_) {
     RETURN_NOT_OK(path_handlers_->Register(web_server_.get()));
   }
 
-  maintenance_manager_.reset(new MaintenanceManager(
-      MaintenanceManager::kDefaultOptions, fs_manager_->uuid()));
+  maintenance_manager_ = std::make_shared<MaintenanceManager>(
+      MaintenanceManager::kDefaultOptions, fs_manager_->uuid());
 
   heartbeater_.reset(new Heartbeater(std::move(master_addrs), this));
 
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 2dc868e..9e83b50 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -1086,7 +1086,8 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletReplica>&
replica,
                        server_->messenger(),
                        server_->result_tracker(),
                        log,
-                       server_->tablet_prepare_pool());
+                       server_->tablet_prepare_pool(),
+                       server_->dns_resolver());
     if (!s.ok()) {
       LOG(ERROR) << LogPrefix(tablet_id) << "Tablet failed to start: "
                  << s.ToString();
diff --git a/src/kudu/util/net/net_util.cc b/src/kudu/util/net/net_util.cc
index b6bb89a..ae35456 100644
--- a/src/kudu/util/net/net_util.cc
+++ b/src/kudu/util/net/net_util.cc
@@ -83,7 +83,7 @@ namespace {
 
 using AddrInfo = unique_ptr<addrinfo, function<void(addrinfo*)>>;
 
-// An utility wrapper around getaddrinfo() call to convert the return code
+// A utility wrapper around getaddrinfo() call to convert the return code
 // of the libc library function into Status.
 Status GetAddrInfo(const string& hostname,
                    const addrinfo& hints,


Mime
View raw message