kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [kudu] branch master updated: Support location awareness in READ_CLOSEST for the C++ client
Date Wed, 09 Jan 2019 21:14:44 GMT
This is an automated email from the ASF dual-hosted git repository.

mpercy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 2831c91  Support location awareness in READ_CLOSEST for the C++ client
2831c91 is described below

commit 2831c91ac75c5b950dc918e80ef47750f9eaa893
Author: Will Berkeley <wdberkeley@gmail.org>
AuthorDate: Fri Jan 4 13:43:15 2019 -0800

    Support location awareness in READ_CLOSEST for the C++ client
    
    Previously, in READ_CLOSEST, the C++ client would choose to read from a
    local tablet server if possible and otherwise pick a random replica.
    This changes and enhances READ_CLOSEST mode to work as follows:
    
    1. If there is a local server, use it. If there are multiple local
       servers, choose one at random.
    2. If there is a server in the same location as the client, use it. If
       there are multiple servers in the same location, choose one at
       random.
    3. Otherwise, choose a server at random.
    
    This is not a breaking change, as in the absence of locations the
    behavior is consistent with what was documented before, which was only
    that a local server would be chosen if possible, else a random server.
    
    Change-Id: I2c6bcc7479c5cf2e17cb6e368ca89a1eb7f21713
    Reviewed-on: http://gerrit.cloudera.org:8080/12138
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <granthenke@apache.org>
    Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
---
 src/kudu/client/client-internal.cc                 |  50 ++++++++---
 src/kudu/client/client-internal.h                  |   6 +-
 src/kudu/client/client.cc                          |   3 +-
 src/kudu/client/client.h                           |   7 +-
 src/kudu/client/meta_cache.cc                      |   8 +-
 src/kudu/client/meta_cache.h                       |   6 +-
 .../integration-tests/location_assignment-itest.cc | 100 +++++++++++++++++++++
 7 files changed, 156 insertions(+), 24 deletions(-)

diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 40496c7..fda6642 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -180,7 +180,7 @@ RemoteTabletServer* KuduClient::Data::SelectTServer(const scoped_refptr<RemoteTa
     case CLOSEST_REPLICA:
     case FIRST_REPLICA: {
       rt->GetRemoteTabletServers(candidates);
-      // Filter out all the blacklisted candidates.
+      // Exclude all the blacklisted candidates.
       vector<RemoteTabletServer*> filtered;
       for (RemoteTabletServer* rts : *candidates) {
         if (!ContainsKey(blacklist, rts->permanent_uuid())) {
@@ -193,23 +193,44 @@ RemoteTabletServer* KuduClient::Data::SelectTServer(const scoped_refptr<RemoteTa
         if (!filtered.empty()) {
           ret = filtered[0];
         }
-      } else if (selection == CLOSEST_REPLICA) {
-        // Choose a local replica.
-        for (RemoteTabletServer* rts : filtered) {
-          if (IsTabletServerLocal(*rts)) {
-            ret = rts;
-            break;
-          }
+        break;
+      }
+      // Choose a replica as follows:
+      // 1. If there is a replica local to the client, pick it. If there are
+      // multiple, pick a random one.
+      // 2. Otherwise, if there is a replica in the same location, pick it. If
+      // there are multiple, pick a random one.
+      // 3. If there are no local replicas or replicas in the same location,
+      // pick a random replica.
+      // TODO(wdberkeley): Eventually, the client might use the hierarchical
+      // structure of a location to determine proximity.
+      const string client_location = location();
+      vector<RemoteTabletServer*> local;
+      vector<RemoteTabletServer*> same_location;
+      local.reserve(filtered.size());
+      same_location.reserve(filtered.size());
+      for (RemoteTabletServer* rts : filtered) {
+        if (IsTabletServerLocal(*rts)) {
+          local.push_back(rts);
         }
-        // Fallback to a random replica if none are local.
-        if (ret == nullptr && !filtered.empty()) {
-          ret = filtered[rand() % filtered.size()];
+        if (!client_location.empty()) {
+          const string replica_location = rts->location();
+          if (client_location == replica_location) {
+            same_location.push_back(rts);
+          }
         }
       }
+      if (!local.empty()) {
+        ret = local[rand() % local.size()];
+      } else if (!same_location.empty()) {
+        ret = same_location[rand() % same_location.size()];
+      } else if (!filtered.empty()) {
+        ret = filtered[rand() % filtered.size()];
+      }
       break;
     }
     default: {
-      LOG(FATAL) << "Unknown ProxySelection value " << selection;
+      LOG(FATAL) << "Unknown ReplicaSelection value " << selection;
       break;
     }
   }
@@ -635,6 +656,11 @@ vector<HostPort> KuduClient::Data::master_hostports() const {
   return master_hostports_;
 }
 
+string KuduClient::Data::location() const {
+  std::lock_guard<simple_spinlock> l(leader_master_lock_);
+  return location_;
+}
+
 shared_ptr<master::MasterServiceProxy> KuduClient::Data::master_proxy() const {
   std::lock_guard<simple_spinlock> l(leader_master_lock_);
   return master_proxy_;
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index 3c38531..85cbafc 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -147,8 +147,8 @@ class KuduClient::Data {
 
   bool IsTabletServerLocal(const internal::RemoteTabletServer& rts) const;
 
-  // Returns a non-failed replica of the specified tablet based on the provided selection
criteria
-  // and tablet server blacklist.
+  // Returns a non-failed replica of the specified tablet based on the provided
+  // selection criteria and tablet server blacklist.
   //
   // Returns NULL if there are no valid tablet servers.
   internal::RemoteTabletServer* SelectTServer(
@@ -194,6 +194,8 @@ class KuduClient::Data {
 
   std::vector<HostPort> master_hostports() const;
 
+  std::string location() const;
+
   uint64_t GetLatestObservedTimestamp() const;
 
   void UpdateLatestObservedTimestamp(uint64_t timestamp);
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 74ebb09..6d8239a 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -660,8 +660,7 @@ string KuduClient::GetHiveMetastoreUuid() const {
 }
 
 string KuduClient::location() const {
-  std::lock_guard<simple_spinlock> l(data_->leader_master_lock_);
-  return data_->location_;
+  return data_->location();
 }
 
 ////////////////////////////////////////////////////////////
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 18e0cf9..09103d6 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -470,8 +470,11 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient>
{
   enum ReplicaSelection {
     LEADER_ONLY,      ///< Select the LEADER replica.
 
-    CLOSEST_REPLICA,  ///< Select the closest replica to the client,
-                      ///< or a random one if all replicas are equidistant.
+    CLOSEST_REPLICA,  ///< Select the closest replica to the client.
+                      ///< Local replicas are considered the closest,
+                      ///< followed by replicas in the same location as the
+                      ///< client, followed by all other replicas. If there are
+                      ///< multiple closest replicas, one is chosen randomly.
 
     FIRST_REPLICA     ///< Select the first replica in the list.
   };
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 7d7701f..1ac130f 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -82,9 +82,7 @@ namespace client {
 namespace internal {
 
 RemoteTabletServer::RemoteTabletServer(const master::TSInfoPB& pb)
-  : uuid_(pb.permanent_uuid()),
-    location_(pb.location()) {
-
+  : uuid_(pb.permanent_uuid()) {
   Update(pb);
 }
 
@@ -151,13 +149,15 @@ void RemoteTabletServer::Update(const master::TSInfoPB& pb) {
   for (const HostPortPB& hostport_pb : pb.rpc_addresses()) {
     rpc_hostports_.emplace_back(hostport_pb.host(), hostport_pb.port());
   }
+  location_ = pb.location();
 }
 
 const string& RemoteTabletServer::permanent_uuid() const {
   return uuid_;
 }
 
-const string& RemoteTabletServer::location() const {
+string RemoteTabletServer::location() const {
+  std::lock_guard<simple_spinlock> l(lock_);
   return location_;
 }
 
diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h
index a23165d..83c104c 100644
--- a/src/kudu/client/meta_cache.h
+++ b/src/kudu/client/meta_cache.h
@@ -106,7 +106,9 @@ class RemoteTabletServer {
   // Returns the remote server's uuid.
   const std::string& permanent_uuid() const;
 
-  const std::string& location() const;
+  // Return a copy of this tablet server's location, as assigned by the master.
+  // If no location is assigned, the returned string will be empty.
+  std::string location() const;
 
  private:
   // Internal callback for DNS resolution.
@@ -119,7 +121,7 @@ class RemoteTabletServer {
   mutable simple_spinlock lock_;
   const std::string uuid_;
   // If not assigned, location_ will be an empty string.
-  const std::string location_;
+  std::string location_;
 
   std::vector<HostPort> rpc_hostports_;
   std::shared_ptr<tserver::TabletServerServiceProxy> proxy_;
diff --git a/src/kudu/integration-tests/location_assignment-itest.cc b/src/kudu/integration-tests/location_assignment-itest.cc
index 96b3dcd..cae1fa8 100644
--- a/src/kudu/integration-tests/location_assignment-itest.cc
+++ b/src/kudu/integration-tests/location_assignment-itest.cc
@@ -15,25 +15,46 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <cstdint>
 #include <memory>
 #include <ostream>
 #include <string>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/client/client-test-util.h"
+#include "kudu/client/client.h"
+#include "kudu/client/shared_ptr.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/ts_itest-base.h"
+#include "kudu/master/master.pb.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_int32(num_replicas);
+DECLARE_int32(num_tablet_servers);
+
+METRIC_DECLARE_counter(scans_started);
+
+METRIC_DECLARE_entity(tablet);
+
+using kudu::client::KuduClient;
+using kudu::client::KuduScanner;
 using kudu::cluster::ExternalMiniCluster;
 using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::cluster::LocationInfo;
@@ -41,10 +62,89 @@ using kudu::itest::TServerDetails;
 using std::string;
 using std::unique_ptr;
 using std::unordered_map;
+using std::vector;
 using strings::Substitute;
 
 namespace kudu {
 
+class ClientLocationAssignmentITest :
+    public tserver::TabletServerIntegrationTestBase {
+};
+
+TEST_F(ClientLocationAssignmentITest, Basic) {
+  // Generate the location mapping and build the cluster. There are three
+  // locations. One location has two spots, which may be occupied by two
+  // tablet servers or a tablet server and the client.
+  LocationInfo info;
+  int client_loc_idx = random_.Uniform(FLAGS_num_tablet_servers);
+  for (int i = 0; i < FLAGS_num_tablet_servers; i++) {
+    EmplaceOrDie(&info, Substitute("/L$0", i), i == client_loc_idx ? 2 : 1);
+  }
+  FLAGS_num_replicas = FLAGS_num_tablet_servers;
+  NO_FATALS(BuildAndStart({}, {}, std::move(info)));
+
+  // Find the tablet server that is colocated with the client, if there is one.
+  const auto timeout = MonoDelta::FromSeconds(30);
+  vector<master::ListTabletServersResponsePB_Entry> tservers;
+  ASSERT_OK(itest::ListTabletServers(cluster_->master_proxy(),
+                                     timeout,
+                                     &tservers));
+  const string client_location = client_->location();
+  ASSERT_FALSE(client_location.empty());
+  string client_colocated_tserver_uuid;
+  for (const auto& tserver : tservers) {
+    const auto& ts_location = tserver.location();
+    ASSERT_FALSE(ts_location.empty());
+    if (ts_location == client_location) {
+      client_colocated_tserver_uuid = tserver.instance_id().permanent_uuid();
+    }
+  }
+
+  // Wait for each replica to have received an op. This should be the NO_OP
+  // asserting the leader's leadership in the current term. If we don't wait
+  // for a safetime-advancing op to be received, scans might be rejected for
+  // correctness reasons (see KUDU-2463). This will blacklist the replica in
+  // the client and possibly cause a scan outside of the location on retry.
+  ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 1));
+
+  // Scan the table in CLOSEST_REPLICA mode.
+  KuduScanner scanner(table_.get());
+  ASSERT_OK(scanner.SetSelection(KuduClient::ReplicaSelection::CLOSEST_REPLICA));
+  vector<string> rows;
+  ASSERT_OK(ScanToStrings(&scanner, &rows));
+  ASSERT_TRUE(rows.empty());
+
+  // The number of scans started is the number of tablets, 1. If on Linux,
+  // check that CLOSEST_REPLICA is working as expected.
+  int64_t total_scans_started = 0;
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    const auto* tserver = cluster_->tablet_server(i);
+    int64_t scans_started;
+    ASSERT_OK(itest::GetInt64Metric(tserver->bound_http_hostport(),
+                                    &METRIC_ENTITY_tablet,
+                                    nullptr,
+                                    &METRIC_scans_started,
+                                    "value",
+                                    &scans_started));
+    total_scans_started += scans_started;
+# if defined(__linux__)
+    // When running on Linux, the client and tablet servers each have their own
+    // IP in the local address space, so no tablet server will be considered
+    // local to the client. If there is a tablet server in the same location as
+    // the client, it will be the only tablet server scanned. Otherwise, some
+    // random tablet server will be scanned.
+    if (!client_colocated_tserver_uuid.empty()) {
+      if (tserver->uuid() == client_colocated_tserver_uuid) {
+        ASSERT_EQ(1, scans_started);
+      } else {
+        ASSERT_EQ(0, scans_started);
+      }
+    }
+# endif
+  }
+  ASSERT_EQ(1, total_scans_started);
+}
+
 class TsLocationAssignmentITest :
     public KuduTest,
     public ::testing::WithParamInterface<std::tuple<int, int>> {


Mime
View raw message