kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] branch branch-1.9.x updated: [TSManager] don't hold lock while running LA command
Date Wed, 20 Mar 2019 04:23:11 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch branch-1.9.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.9.x by this push:
     new a580507  [TSManager] don't hold lock while running LA command
a580507 is described below

commit a580507cb0c06363c68c43d8075abba5e5973911
Author: Alexey Serbin <alexey@apache.org>
AuthorDate: Wed Mar 13 00:50:08 2019 -0700

    [TSManager] don't hold lock while running LA command
    
    This patch modifies the way how the location assignment is run
    during the course of tablet server registration with master.
    
    Prior to this patch, TSManager's lock on the registry of all
    tablet servers was held while running the location assignment
    command.  That might lead to stacking many TS registration
    heartbeats in flight while master's service threads are waiting
    to acquire the TSManager's lock.
    
    This patch introduces changes to run the location assignment command
    without holding the TSManager's lock.  That makes the registration
    of tablet servers with masters more robust and makes the processing
    of concurrent heartbeats from tablet servers more efficient overall.
    
    Conflicts:
        src/kudu/master/ts_descriptor.cc
    
        Intermediate changelist 717349b4e was not cherry-picked,
        so still using simple_spinlock instead of rw_spinlock
        in TSDescriptor::Register().
    
    Change-Id: I59cd5f6ed19c162a7c9f9a6527e78cab782b4539
    Reviewed-on: http://gerrit.cloudera.org:8080/12749
    Tested-by: Kudu Jenkins
    Reviewed-by: Will Berkeley <wdberkeley@gmail.com>
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    (cherry picked from commit c01796cdbd32be39ad8d82ddb80d94190010e9c4)
    Reviewed-on: http://gerrit.cloudera.org:8080/12788
    Tested-by: Alexey Serbin <aserbin@cloudera.com>
    Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
---
 src/kudu/master/location_cache-test.cc | 41 +++++++++++-----
 src/kudu/master/master.cc              |  9 +++-
 src/kudu/master/ts_descriptor-test.cc  | 45 +----------------
 src/kudu/master/ts_descriptor.cc       | 73 +++++----------------------
 src/kudu/master/ts_descriptor.h        |  9 +---
 src/kudu/master/ts_manager.cc          | 90 +++++++++++++++++++++++++++-------
 6 files changed, 124 insertions(+), 143 deletions(-)

diff --git a/src/kudu/master/location_cache-test.cc b/src/kudu/master/location_cache-test.cc
index 96c1bea..81d3f27 100644
--- a/src/kudu/master/location_cache-test.cc
+++ b/src/kudu/master/location_cache-test.cc
@@ -79,6 +79,15 @@ TEST_F(LocationCacheTest, EmptyMappingCommand) {
   NO_FATALS(CheckMetrics(1, 0));
 }
 
+TEST_F(LocationCacheTest, MappingCommandNotFound) {
+  LocationCache cache("./notfound.sh", metric_entity_.get());
+  string location;
+  auto s = cache.GetLocation("na", &location);
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+  ASSERT_STR_CONTAINS(
+      s.ToString(), "failed to run location mapping command: ");
+}
+
 TEST_F(LocationCacheTest, MappingCommandFailureExitStatus) {
   LocationCache cache("/sbin/nologin", metric_entity_.get());
   string location;
@@ -99,18 +108,28 @@ TEST_F(LocationCacheTest, MappingCommandEmptyOutput) {
   NO_FATALS(CheckMetrics(1, 0));
 }
 
+// Bad cases where the script returns locations with disallowed characters or
+// in the wrong format.
 TEST_F(LocationCacheTest, MappingCommandReturnsInvalidLocation) {
-  const string cmd_path = JoinPathSegments(GetTestExecutableDirectory(),
-                                           "testdata/first_argument.sh");
-  const string location_mapping_cmd = Substitute("$0 invalid.location",
-                                                 cmd_path);
-  LocationCache cache(location_mapping_cmd, metric_entity_.get());
-  string location;
-  auto s = cache.GetLocation("na", &location);
-  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
-  ASSERT_STR_CONTAINS(
-      s.ToString(), "location mapping command returned invalid location");
-  NO_FATALS(CheckMetrics(1, 0));
+  const vector<string> bad_locations = {
+    "\"\"",           // Empty (doesn't begin with /).
+    "foo",            // Doesn't begin with /.
+    "/foo$",          // Contains the illegal character '$'.
+    "\"/foo /bar\"",  // Contains the illegal character ' '.
+  };
+  for (const auto& l : bad_locations) {
+    SCOPED_TRACE(Substitute("location '$0'", l));
+    const string cmd_path = JoinPathSegments(GetTestExecutableDirectory(),
+                                             "testdata/first_argument.sh");
+    const string location_mapping_cmd = Substitute("$0 $1", cmd_path, l);
+    LocationCache cache(location_mapping_cmd, metric_entity_.get());
+    string location;
+    auto s = cache.GetLocation("na", &location);
+    ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+    ASSERT_STR_CONTAINS(
+        s.ToString(), "location mapping command returned invalid location");
+  }
+  NO_FATALS(CheckMetrics(bad_locations.size(), 0));
 }
 
 TEST_F(LocationCacheTest, HappyPath) {
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index f6a46d2..182b349 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -87,9 +87,16 @@ DEFINE_int64(authz_token_validity_seconds, 60 * 5,
              "validity period expires.");
 TAG_FLAG(authz_token_validity_seconds, experimental);
 
+DEFINE_string(location_mapping_cmd, "",
+              "A Unix command which takes a single argument, the IP address or "
+              "hostname of a tablet server or client, and returns the location "
+              "string for the tablet server. A location string begins with a / "
+              "and consists of /-separated tokens each of which contains only "
+              "characters from the set [a-zA-Z0-9_-.]. If the cluster is not "
+              "using location awareness features this flag should not be set.");
+
 DECLARE_bool(hive_metastore_sasl_enabled);
 DECLARE_string(keytab_file);
-DECLARE_string(location_mapping_cmd);
 
 using std::min;
 using std::shared_ptr;
diff --git a/src/kudu/master/ts_descriptor-test.cc b/src/kudu/master/ts_descriptor-test.cc
index 1593458..d464b7e 100644
--- a/src/kudu/master/ts_descriptor-test.cc
+++ b/src/kudu/master/ts_descriptor-test.cc
@@ -28,11 +28,7 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/master/location_cache.h"
-#include "kudu/util/path_util.h"
-#include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
 
 using std::shared_ptr;
 using std::string;
@@ -73,7 +69,7 @@ TEST(TSDescriptorTest, TestRegistration) {
   ServerRegistrationPB registration;
   SetupBasicRegistrationInfo(uuid, &instance, &registration);
   shared_ptr<TSDescriptor> desc;
-  ASSERT_OK(TSDescriptor::RegisterNew(instance, registration, nullptr, &desc));
+  ASSERT_OK(TSDescriptor::RegisterNew(instance, registration, {}, &desc));
 
   // Spot check some fields and the ToString value.
   ASSERT_EQ(uuid, desc->permanent_uuid());
@@ -84,52 +80,15 @@ TEST(TSDescriptorTest, TestRegistration) {
 }
 
 TEST(TSDescriptorTest, TestLocationCmd) {
-  const string kLocationCmdPath = JoinPathSegments(GetTestExecutableDirectory(),
-                                                   "testdata/first_argument.sh");
   // A happy case, using all allowed special characters.
   const string location = "/foo-bar0/BAAZ._9-quux";
-  const string location_cmd = Substitute("$0 $1", kLocationCmdPath, location);
-  LocationCache cache(location_cmd, nullptr);
-
   const string uuid = "test";
   NodeInstancePB instance;
   ServerRegistrationPB registration;
   SetupBasicRegistrationInfo(uuid, &instance, &registration);
   shared_ptr<TSDescriptor> desc;
-  ASSERT_OK(TSDescriptor::RegisterNew(instance, registration, &cache, &desc));
-
+  ASSERT_OK(TSDescriptor::RegisterNew(instance, registration, location, &desc));
   ASSERT_EQ(location, desc->location());
-
-  // Bad cases where the script returns locations with disallowed characters or
-  // in the wrong format.
-  const vector<string> bad_locations = {
-    "\"\"",      // Empty (doesn't begin with /).
-    "foo",       // Doesn't begin with /.
-    "/foo$",     // Contains the illegal character '$'.
-  };
-  for (const auto& location : bad_locations) {
-    const auto location_cmd = Substitute("$0 $1", kLocationCmdPath, location);
-    LocationCache cache(location_cmd, nullptr);
-    ASSERT_TRUE(desc->Register(instance, registration, &cache).IsRuntimeError());
-  }
-
-  // Bad cases where the script is invalid.
-  const vector<string> bad_cmds = {
-    // No command provided.
-    " ",
-    // Command not found.
-    "notfound.sh",
-    // Command returns no output.
-    "true",
-    // Command fails.
-    "false",
-    // Command returns too many locations (i.e. contains illegal ' ' character).
-    Substitute("echo $0 $1", "/foo", "/bar"),
-  };
-  for (const auto& cmd : bad_cmds) {
-    LocationCache cache(cmd, nullptr);
-    ASSERT_TRUE(desc->Register(instance, registration, &cache).IsRuntimeError());
-  }
 }
 } // namespace master
 } // namespace kudu
diff --git a/src/kudu/master/ts_descriptor.cc b/src/kudu/master/ts_descriptor.cc
index 208adac..23edee6 100644
--- a/src/kudu/master/ts_descriptor.cc
+++ b/src/kudu/master/ts_descriptor.cc
@@ -24,6 +24,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
@@ -31,16 +32,12 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.proxy.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/master/location_cache.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/util/flag_tags.h"
-#include "kudu/util/logging.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
-#include "kudu/util/trace.h"
 
 DEFINE_int32(tserver_unresponsive_timeout_ms, 60 * 1000,
              "The period of time that a Master can go without receiving a heartbeat from
a "
@@ -48,20 +45,6 @@ DEFINE_int32(tserver_unresponsive_timeout_ms, 60 * 1000,
              "selected when assigning replicas during table creation or re-replication.");
 TAG_FLAG(tserver_unresponsive_timeout_ms, advanced);
 
-DEFINE_string(location_mapping_cmd, "",
-              "A Unix command which takes a single argument, the IP address or "
-              "hostname of a tablet server or client, and returns the location "
-              "string for the tablet server. A location string begins with a / "
-              "and consists of /-separated tokens each of which contains only "
-              "characters from the set [a-zA-Z0-9_-.]. If the cluster is not "
-              "using location awareness features this flag should not be set.");
-
-DEFINE_bool(location_mapping_by_uuid, false,
-            "Whether the location command is given tablet server identifier "
-            "instead of hostname/IP address (for tests only).");
-TAG_FLAG(location_mapping_by_uuid, hidden);
-TAG_FLAG(location_mapping_by_uuid, unsafe);
-
 using kudu::pb_util::SecureDebugString;
 using kudu::pb_util::SecureShortDebugString;
 using std::shared_ptr;
@@ -74,10 +57,10 @@ namespace master {
 
 Status TSDescriptor::RegisterNew(const NodeInstancePB& instance,
                                  const ServerRegistrationPB& registration,
-                                 LocationCache* location_cache,
+                                 const boost::optional<std::string>& location,
                                  shared_ptr<TSDescriptor>* desc) {
   shared_ptr<TSDescriptor> ret(TSDescriptor::make_shared(instance.permanent_uuid()));
-  RETURN_NOT_OK(ret->Register(instance, registration, location_cache));
+  RETURN_NOT_OK(ret->Register(instance, registration, location));
   desc->swap(ret);
   return Status::OK();
 }
@@ -112,8 +95,10 @@ static bool HostPortPBsEqual(const google::protobuf::RepeatedPtrField<HostPortPB
   return hostports1 == hostports2;
 }
 
-Status TSDescriptor::RegisterUnlocked(const NodeInstancePB& instance,
-                                      const ServerRegistrationPB& registration) {
+Status TSDescriptor::Register(const NodeInstancePB& instance,
+                              const ServerRegistrationPB& registration,
+                              const boost::optional<std::string>& location) {
+  std::lock_guard<simple_spinlock> l(lock_);
   CHECK_EQ(instance.permanent_uuid(), permanent_uuid_);
 
   // TODO(KUDU-418): we don't currently support changing RPC addresses since the
@@ -145,52 +130,15 @@ Status TSDescriptor::RegisterUnlocked(const NodeInstancePB& instance,
     // It's possible that the TS registered, but our response back to it
     // got lost, so it's trying to register again with the same sequence
     // number. That's fine.
-    LOG(INFO) << "Processing retry of TS registration from " << SecureShortDebugString(instance);
+    LOG(INFO) << "Processing retry of TS registration from "
+              << SecureShortDebugString(instance);
   }
 
   latest_seqno_ = instance.instance_seqno();
   registration_.reset(new ServerRegistrationPB(registration));
   ts_admin_proxy_.reset();
   consensus_proxy_.reset();
-  return Status::OK();
-}
-
-Status TSDescriptor::Register(const NodeInstancePB& instance,
-                              const ServerRegistrationPB& registration,
-                              LocationCache* location_cache) {
-  // Do basic registration work under the lock.
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-    RETURN_NOT_OK(RegisterUnlocked(instance, registration));
-  }
-
-  // Resolve the location outside the lock. This involves calling the location
-  // mapping script.
-  if (PREDICT_TRUE(location_cache != nullptr)) {
-    // In some test scenarios the location is assigned per tablet server UUID.
-    // That's the case when multiple (or even all) tablet servers have the same
-    // IP address for their RPC endpoint.
-    const auto& cmd_arg = FLAGS_location_mapping_by_uuid
-        ? permanent_uuid() : registration_->rpc_addresses(0).host();
-    TRACE(Substitute("tablet server $0: assigning location", permanent_uuid()));
-    string location;
-    const auto s = location_cache->GetLocation(cmd_arg, &location);
-    TRACE(Substitute(
-        "tablet server $0: assigned location '$1'", permanent_uuid(), location));
-
-    // Assign the location under the lock if location resolution succeeds. If
-    // it fails, log the error.
-    if (s.ok()) {
-      std::lock_guard<simple_spinlock> l(lock_);
-      location_.emplace(std::move(location));
-    } else {
-      KLOG_EVERY_N_SECS(ERROR, 60) << Substitute(
-          "Unable to assign location to tablet server $0: $1",
-          ToString(), s.ToString());
-      return s;
-    }
-  }
-
+  location_ = location;
   return Status::OK();
 }
 
@@ -338,6 +286,7 @@ Status TSDescriptor::GetConsensusProxy(const shared_ptr<rpc::Messenger>&
messeng
 
 string TSDescriptor::ToString() const {
   std::lock_guard<simple_spinlock> l(lock_);
+  CHECK(!registration_->rpc_addresses().empty());
   const auto& addr = registration_->rpc_addresses(0);
   return Substitute("$0 ($1:$2)", permanent_uuid_, addr.host(), addr.port());
 }
diff --git a/src/kudu/master/ts_descriptor.h b/src/kudu/master/ts_descriptor.h
index 76a5fa8..7f61e20 100644
--- a/src/kudu/master/ts_descriptor.h
+++ b/src/kudu/master/ts_descriptor.h
@@ -53,8 +53,6 @@ class TabletServerAdminServiceProxy;
 
 namespace master {
 
-class LocationCache;
-
 // Master-side view of a single tablet server.
 //
 // Tracks the last heartbeat, status, instance identifier, location, etc.
@@ -63,7 +61,7 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
  public:
   static Status RegisterNew(const NodeInstancePB& instance,
                             const ServerRegistrationPB& registration,
-                            LocationCache* location_cache,
+                            const boost::optional<std::string>& location,
                             std::shared_ptr<TSDescriptor>* desc);
 
   virtual ~TSDescriptor() = default;
@@ -81,7 +79,7 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
   // Register this tablet server.
   Status Register(const NodeInstancePB& instance,
                   const ServerRegistrationPB& registration,
-                  LocationCache* location_cache);
+                  const boost::optional<std::string>& location);
 
   const std::string &permanent_uuid() const { return permanent_uuid_; }
   int64_t latest_seqno() const;
@@ -143,9 +141,6 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
   FRIEND_TEST(TestTSDescriptor, TestReplicaCreationsDecay);
   friend class PlacementPolicyTest;
 
-  Status RegisterUnlocked(const NodeInstancePB& instance,
-                          const ServerRegistrationPB& registration);
-
   // Uses DNS to resolve registered hosts to a single Sockaddr.
   // Returns the resolved address as well as the hostname associated with it
   // in 'addr' and 'host'.
diff --git a/src/kudu/master/ts_manager.cc b/src/kudu/master/ts_manager.cc
index 6bd5939..92a9b09 100644
--- a/src/kudu/master/ts_manager.cc
+++ b/src/kudu/master/ts_manager.cc
@@ -20,18 +20,31 @@
 #include <algorithm>
 #include <limits>
 #include <mutex>
-#include <vector>
 
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 
+#include "kudu/common/common.pb.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/bind_helpers.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/location_cache.h"
 #include "kudu/master/ts_descriptor.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/trace.h"
+
+DEFINE_bool(location_mapping_by_uuid, false,
+            "Whether the location command is given tablet server identifier "
+            "instead of hostname/IP address (for tests only).");
+TAG_FLAG(location_mapping_by_uuid, hidden);
+TAG_FLAG(location_mapping_by_uuid, unsafe);
 
 METRIC_DEFINE_gauge_int32(server, cluster_replica_skew,
                           "Cluster Replica Skew",
@@ -41,9 +54,9 @@ METRIC_DEFINE_gauge_int32(server, cluster_replica_skew,
                           "the number of replicas on the tablet server hosting "
                           "the least replicas.");
 
+using kudu::pb_util::SecureShortDebugString;
 using std::shared_ptr;
 using std::string;
-using std::vector;
 using strings::Substitute;
 
 namespace kudu {
@@ -68,13 +81,13 @@ Status TSManager::LookupTS(const NodeInstancePB& instance,
     FindOrNull(servers_by_id_, instance.permanent_uuid());
   if (!found_ptr) {
     return Status::NotFound("unknown tablet server ID",
-        pb_util::SecureShortDebugString(instance));
+        SecureShortDebugString(instance));
   }
   const shared_ptr<TSDescriptor>& found = *found_ptr;
 
   if (instance.instance_seqno() != found->latest_seqno()) {
     return Status::NotFound("mismatched instance sequence number",
-                            pb_util::SecureShortDebugString(instance));
+                            SecureShortDebugString(instance));
   }
 
   *ts_desc = found;
@@ -90,24 +103,63 @@ bool TSManager::LookupTSByUUID(const string& uuid,
 Status TSManager::RegisterTS(const NodeInstancePB& instance,
                              const ServerRegistrationPB& registration,
                              std::shared_ptr<TSDescriptor>* desc) {
-  std::lock_guard<rw_spinlock> l(lock_);
+  // Pre-condition: registration info should contain at least one RPC end-point.
+  if (registration.rpc_addresses().empty()) {
+    return Status::InvalidArgument(
+        "invalid registration: must have at least one RPC address",
+        SecureShortDebugString(registration));
+  }
+
   const string& uuid = instance.permanent_uuid();
 
-  if (!ContainsKey(servers_by_id_, uuid)) {
-    shared_ptr<TSDescriptor> new_desc;
-    RETURN_NOT_OK(TSDescriptor::RegisterNew(
-        instance, registration, location_cache_, &new_desc));
-    InsertOrDie(&servers_by_id_, uuid, new_desc);
-    LOG(INFO) << Substitute("Registered new tserver with Master: $0",
-                            new_desc->ToString());
-    desc->swap(new_desc);
-  } else {
-    shared_ptr<TSDescriptor> found(FindOrDie(servers_by_id_, uuid));
-    RETURN_NOT_OK(found->Register(instance, registration, location_cache_));
-    LOG(INFO) << Substitute("Re-registered known tserver with Master: $0",
-                            found->ToString());
-    desc->swap(found);
+  // Assign the location for the tablet server outside the lock: assigning
+  // a location involves calling the location mapping script which is relatively
+  // long and expensive operation.
+  boost::optional<string> location;
+  if (PREDICT_TRUE(location_cache_ != nullptr)) {
+    // In some test scenarios the location is assigned per tablet server UUID.
+    // That's the case when multiple (or even all) tablet servers have the same
+    // IP address for their RPC endpoint.
+    const auto& cmd_arg = FLAGS_location_mapping_by_uuid
+        ? uuid : registration.rpc_addresses(0).host();
+    TRACE(Substitute("tablet server $0: assigning location", uuid));
+    string location_str;
+    const auto s = location_cache_->GetLocation(cmd_arg, &location_str);
+    TRACE(Substitute(
+        "tablet server $0: assigned location '$1'", uuid, location_str));
+
+    // If location resolution fails, log the error and return the status.
+    if (!s.ok()) {
+      CHECK(!registration.rpc_addresses().empty());
+      const auto& addr = registration.rpc_addresses(0);
+      KLOG_EVERY_N_SECS(ERROR, 60) << Substitute(
+          "Unable to assign location to tablet server $0: $1",
+          Substitute("$0 ($1:$2)", uuid, addr.host(), addr.port()),
+          s.ToString());
+      return s;
+    }
+    location.emplace(std::move(location_str));
+  }
+
+  shared_ptr<TSDescriptor> descriptor;
+  bool new_tserver = false;
+  {
+    std::lock_guard<rw_spinlock> l(lock_);
+    auto* descriptor_ptr = FindOrNull(servers_by_id_, uuid);
+    if (descriptor_ptr) {
+      descriptor = *descriptor_ptr;
+      RETURN_NOT_OK(descriptor->Register(instance, registration, location));
+    } else {
+      RETURN_NOT_OK(TSDescriptor::RegisterNew(
+          instance, registration, location, &descriptor));
+      InsertOrDie(&servers_by_id_, uuid, descriptor);
+      new_tserver = true;
+    }
   }
+  LOG(INFO) << Substitute("$0 tserver with Master: $1",
+                          new_tserver ? "Registered new" : "Re-registered known",
+                          descriptor->ToString());
+  *desc = std::move(descriptor);
 
   return Status::OK();
 }


Mime
View raw message