kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] branch master updated: KUDU-2842: don't reference CowLock state from TSInfosDict
Date Fri, 20 Sep 2019 21:31:03 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 15ecb24  KUDU-2842: don't reference CowLock state from TSInfosDict
15ecb24 is described below

commit 15ecb240d8055743090f8739e8e29dd8b9fea0b3
Author: Andrew Wong <awong@cloudera.com>
AuthorDate: Thu Sep 19 01:01:47 2019 -0700

    KUDU-2842: don't reference CowLock state from TSInfosDict
    
    The following race was previously possible:
    
    Tablet A belongs to table T
    1. T1: receive GetTableLocations for table T
    2. T1: lock A in READ mode
    3. T1: add StringPiece(A) => TSInfoPB to dict, where StringPiece(A) points
           to the CowLock state
    4. T1: unlock A in READ mode
    5. T2: receive ProcessFullTabletReport that has an updated cstate for A
    6. T2: lock A in WRITE mode
    7. T2: COMMIT mutation to A based on the report, blowing away old in-memory
           CowLock state, and unlock A
    8. T1: try to update dict, but key StringPiece(A) is corrupted
    
    This patch addresses this by making the following change:
    
    3. T1: add StringPiece(A) => TSInfoPB to dict, where StringPiece(A)
           points into the TSInfoPB
    
    This patch adds a variant of the ComputeIfAbsent() map utility that
    facilitates this self-referential map insertion.
    
    Testing:
    - adds a test for the new map function
    - adds a test that runs many elections while concurrently hammering the
      GetTableLocations endpoint; without this patch, the test would hit
      data races
    - I also reran the benchmark posted in 586e957f7 to verify this doesn't regress
      performance
    
    With this patch:
    ================
    Count: 57230
    Mean: 1341.46
    Percentiles:
       0%  (min) = 69
      25%        = 772
      50%  (med) = 1408
      75%        = 1728
      95%        = 2240
      99%        = 2736
      99.9%      = 3616
      99.99%     = 5440
      100% (max) = 11976
    
    Without this patch:
    ===================
    Count: 56325
    Mean: 1360.05
    Percentiles:
       0%  (min) = 122
      25%        = 980
      50%  (med) = 1408
      75%        = 1712
      95%        = 2192
      99%        = 2656
      99.9%      = 3392
      99.99%     = 5088
      100% (max) = 8544
    
    Change-Id: I30f4cd2eb8439e1923c1c2617248514354561d16
    Reviewed-on: http://gerrit.cloudera.org:8080/14263
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/gutil/map-util.h                          |  43 +++++-
 .../integration-tests/ts_tablet_manager-itest.cc   | 159 +++++++++++++++------
 src/kudu/master/catalog_manager.cc                 |  11 +-
 src/kudu/util/map-util-test.cc                     |  36 +++++
 4 files changed, 199 insertions(+), 50 deletions(-)

diff --git a/src/kudu/gutil/map-util.h b/src/kudu/gutil/map-util.h
index 1876547..7b6d7bb 100644
--- a/src/kudu/gutil/map-util.h
+++ b/src/kudu/gutil/map-util.h
@@ -875,20 +875,47 @@ void AppendValuesFromMap(const MapContainer& map_container,
 // MyValue* const value = result.first;
 // if (result.second) ....
 //
+// The ComputePair* variants expect a lambda that creates a pair<k, v>. This
+// can be useful if the key is a StringPiece pointing to external state to
+// avoid excess memory for the keys, while being safer in multi-threaded
+// contexts, e.g. in case the key goes out of scope before the container does.
+//
+// Example usage:
+//
+// map<StringPiece, int, GoodFastHash<StringPiece>> string_to_idx;
+// vector<unique_ptr<StringPB>> pbs;
+// auto result = ComputePairIfAbsentReturnAbsense(&string_to_idx, my_key,
+//     [&]() {
+//       unique_ptr<StringPB> s = new StringPB();
+//       s->set_string(my_key);
+//       int idx = pbs.size();
+//       pbs.emplace_back(s.release());
+//       return make_pair(StringPiece(pbs.back()->string()), idx);
+//     });
 template <class MapContainer, typename Function>
 std::pair<typename MapContainer::mapped_type* const, bool>
-ComputeIfAbsentReturnAbsense(MapContainer* container,
-                             const typename MapContainer::key_type& key,
-                             Function compute_func) {
+ComputePairIfAbsentReturnAbsense(MapContainer* container,
+                                 const typename MapContainer::key_type& key,
+                                 Function compute_pair_func) {
   typename MapContainer::iterator iter = container->find(key);
   bool new_value = iter == container->end();
   if (new_value) {
+    auto p = compute_pair_func();
     std::pair<typename MapContainer::iterator, bool> result =
-        container->emplace(key, compute_func());
+        container->emplace(std::move(p.first), std::move(p.second));
     DCHECK(result.second) << "duplicate key: " << key;
     iter = result.first;
   }
   return std::make_pair(&iter->second, new_value);
+}
+template <class MapContainer, typename Function>
+std::pair<typename MapContainer::mapped_type* const, bool>
+ComputeIfAbsentReturnAbsense(MapContainer* container,
+                             const typename MapContainer::key_type& key,
+                             Function compute_func) {
+  return ComputePairIfAbsentReturnAbsense(container, key, [&key, &compute_func] {
+    return std::make_pair(key, compute_func());
+  });
 };
 
 // Like the above but doesn't return a pair, just returns a pointer to the value.
@@ -906,4 +933,12 @@ ComputeIfAbsent(MapContainer* container,
   return ComputeIfAbsentReturnAbsense(container, key, compute_func).first;
 };
 
+template <class MapContainer, typename Function>
+typename MapContainer::mapped_type* const
+ComputePairIfAbsent(MapContainer* container,
+                    const typename MapContainer::key_type& key,
+                    Function compute_pair_func) {
+  return ComputePairIfAbsentReturnAbsense<MapContainer, Function>(container, key, compute_pair_func).first;
+};
+
 #endif  // UTIL_GTL_MAP_UTIL_H_
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 6f42415..d728d46 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -23,6 +23,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -41,6 +42,7 @@
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/quorum_util.h"
 #include "kudu/consensus/raft_consensus.h"
+#include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
@@ -56,6 +58,7 @@
 #include "kudu/master/table_metrics.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/heartbeater.h"
@@ -63,12 +66,14 @@
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/tablet_server_options.h"
 #include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/countdown_latch.h"
 #include "kudu/util/jsonwriter.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -104,6 +109,8 @@ using kudu::consensus::RaftPeerPB;
 using kudu::itest::SimpleIntKeyKuduSchema;
 using kudu::KuduPartialRow;
 using kudu::master::CatalogManager;
+using kudu::master::GetTableLocationsResponsePB;
+using kudu::master::GetTableLocationsRequestPB;
 using kudu::master::Master;
 using kudu::master::MasterServiceProxy;
 using kudu::master::ReportedTabletPB;
@@ -117,6 +124,7 @@ using kudu::ClusterVerifier;
 using std::map;
 using std::shared_ptr;
 using std::string;
+using std::thread;
 using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
@@ -268,51 +276,125 @@ INSTANTIATE_TEST_CASE_P(,
                         FailedTabletsAreReplacedITest,
                         ::testing::Bool());
 
-// Test that when the leader changes, the tablet manager gets notified and
-// includes that information in the next tablet report.
-TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
+class LeadershipChangeReportingTest : public TsTabletManagerITest {
+ public:
   const int kNumReplicas = 2;
-  {
+  void SetUp() override {
+    NO_FATALS(TsTabletManagerITest::SetUp());
+
+    // For tests that heartbeat, set a lower interval to speed things up.
+    FLAGS_heartbeat_interval_ms = 100;
+
     InternalMiniClusterOptions opts;
     opts.num_tablet_servers = kNumReplicas;
     NO_FATALS(StartCluster(std::move(opts)));
-  }
 
-  // We need to control elections precisely for this test since we're using
-  // EmulateElection() with a distributed consensus configuration.
-  FLAGS_enable_leader_failure_detection = false;
-  FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader = false;
+    // We need to control elections precisely for this test since we're using
+    // EmulateElection() with a distributed consensus configuration.
+    FLAGS_enable_leader_failure_detection = false;
+    FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader = false;
 
-  // Allow creating table with even replication factor.
-  FLAGS_allow_unsafe_replication_factor = true;
+    // Allow creating table with even replication factor.
+    FLAGS_allow_unsafe_replication_factor = true;
 
-  // Run a few more iters in slow-test mode.
-  OverrideFlagForSlowTests("num_election_test_loops", "10");
+    // Run a few more iters in slow-test mode.
+    OverrideFlagForSlowTests("num_election_test_loops", "10");
 
-  // Create the table.
-  client::sp::shared_ptr<KuduTable> table;
-  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
-  ASSERT_OK(table_creator->table_name(kTableName)
-            .schema(&schema_)
-            .set_range_partition_columns({ "key" })
-            .num_replicas(kNumReplicas)
-            .Create());
-  ASSERT_OK(client_->OpenTable(kTableName, &table));
+    // Build a TServerDetails map so we can check for convergence.
+    const auto& addr = cluster_->mini_master()->bound_rpc_addr();
+    master_proxy_.reset(new MasterServiceProxy(client_messenger_, addr, addr.host()));
+  }
 
-  // Build a TServerDetails map so we can check for convergence.
-  const auto& addr = cluster_->mini_master()->bound_rpc_addr();
-  shared_ptr<MasterServiceProxy> master_proxy(
-      new MasterServiceProxy(client_messenger_, addr, addr.host()));
+  // Creates 'num_hashes' tablets with two replicas each.
+  Status CreateTable(int num_hashes) {
+    // Create the table.
+    client::sp::shared_ptr<KuduTable> table;
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    if (num_hashes > 1) {
+      RETURN_NOT_OK(table_creator->table_name(kTableName)
+          .schema(&schema_)
+          .set_range_partition_columns({ "key" })
+          .num_replicas(kNumReplicas)
+          .add_hash_partitions({ "key" }, num_hashes)
+          .Create());
+    } else {
+      RETURN_NOT_OK(table_creator->table_name(kTableName)
+          .schema(&schema_)
+          .set_range_partition_columns({ "key" })
+          .num_replicas(kNumReplicas)
+          .Create());
+    }
+    RETURN_NOT_OK(client_->OpenTable(kTableName, &table));
 
-  itest::TabletServerMap ts_map;
-  ASSERT_OK(CreateTabletServerMap(master_proxy, client_messenger_, &ts_map));
-  ValueDeleter deleter(&ts_map);
+    RETURN_NOT_OK(CreateTabletServerMap(master_proxy_, client_messenger_, &ts_map_));
 
-  // Collect the TabletReplicas so we get direct access to RaftConsensus.
-  vector<scoped_refptr<TabletReplica>> tablet_replicas;
-  ASSERT_OK(PrepareTabletReplicas(MonoDelta::FromSeconds(60), &tablet_replicas));
-  ASSERT_EQ(kNumReplicas, tablet_replicas.size());
+    // Collect the TabletReplicas so we get direct access to RaftConsensus.
+    RETURN_NOT_OK(PrepareTabletReplicas(MonoDelta::FromSeconds(60), &tablet_replicas_));
+    CHECK_EQ(kNumReplicas * num_hashes, tablet_replicas_.size());
+    return Status::OK();
+  }
+  void TearDown() override {
+    ValueDeleter deleter(&ts_map_);
+    NO_FATALS(TsTabletManagerITest::TearDown());
+  }
+
+  Status TriggerElection(int min_term = 0, int* new_leader_idx = nullptr) {
+    int leader_idx = rand() % tablet_replicas_.size();
+    LOG(INFO) << "Electing peer " << leader_idx << "...";
+    RaftConsensus* con = CHECK_NOTNULL(tablet_replicas_[leader_idx]->consensus());
+    RETURN_NOT_OK(con->EmulateElection());
+    LOG(INFO) << "Waiting for servers to agree...";
+    RETURN_NOT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(5), ts_map_,
+        tablet_replicas_[leader_idx]->tablet_id(), min_term));
+    if (new_leader_idx) {
+      *new_leader_idx = leader_idx;
+    }
+    return Status::OK();
+  }
+
+ protected:
+  shared_ptr<MasterServiceProxy> master_proxy_;
+  vector<scoped_refptr<TabletReplica>> tablet_replicas_;
+  itest::TabletServerMap ts_map_;
+};
 
+// Regression test for KUDU-2842: concurrent calls to GetTableLocations()
+// shouldn't lead to data races with the changes in reported state.
+TEST_F(LeadershipChangeReportingTest, TestConcurrentGetTableLocations) {
+  // KUDU-2842 requires there to be multiple tablets in a given report, so
+  // create multiple tablets.
+  int kNumTablets = 2;
+  ASSERT_OK(CreateTable(kNumTablets));
+  CountDownLatch latch(1);
+  thread t([&] {
+    master::GetTableLocationsRequestPB req;
+    req.mutable_table()->set_table_name(kTableName);
+    req.set_intern_ts_infos_in_response(true);
+    while (!latch.WaitFor(MonoDelta::FromMilliseconds(10))) {
+      master::GetTableLocationsResponsePB resp;
+      rpc::RpcController rpc;
+      // Note: we only really care about data races, rather than the responses.
+      ignore_result(master_proxy_->GetTableLocations(req, &resp, &rpc));
+    }
+  });
+  SCOPED_CLEANUP({
+    latch.CountDown();
+    NO_FATALS(t.join());
+  });
+  for (int i = 0; i < FLAGS_num_election_test_loops; i++) {
+    for (int t = 0; t < kNumTablets; t++) {
+      // Note: we only really care about data races, rather than the success of
+      // all the elections.
+      ignore_result(TriggerElection());
+    }
+    SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_heartbeat_interval_ms));
+  }
+}
+
+// Test that when the leader changes, the tablet manager gets notified and
+// includes that information in the next tablet report.
+TEST_F(LeadershipChangeReportingTest, TestUpdatedConsensusState) {
+  ASSERT_OK(CreateTable(1));
   // Stop heartbeating we don't race against the Master.
   DisableHeartbeatingToMaster();
 
@@ -320,13 +402,8 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
   // TSTabletManager should acknowledge the role changes via tablet reports.
   for (int i = 0; i < FLAGS_num_election_test_loops; i++) {
     SCOPED_TRACE(Substitute("Iter: $0", i));
-    int new_leader_idx = rand() % 2;
-    LOG(INFO) << "Electing peer " << new_leader_idx << "...";
-    RaftConsensus* con = CHECK_NOTNULL(tablet_replicas[new_leader_idx]->consensus());
-    ASSERT_OK(con->EmulateElection());
-    LOG(INFO) << "Waiting for servers to agree...";
-    ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(5),
-                                    ts_map, tablet_replicas[0]->tablet_id(), i + 1));
+    int new_leader_idx;
+    ASSERT_OK(TriggerElection(i + 1, &new_leader_idx));
 
     // Now check that the tablet report reports the correct role for both servers.
     for (int replica = 0; replica < kNumReplicas; replica++) {
@@ -342,7 +419,7 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
       const ReportedTabletPB& reported_tablet = report.updated_tablets(0);
       ASSERT_TRUE(reported_tablet.has_consensus_state());
 
-      string uuid = tablet_replicas[replica]->permanent_uuid();
+      string uuid = tablet_replicas_[replica]->permanent_uuid();
       RaftPeerPB::Role role = GetConsensusRole(uuid, reported_tablet.consensus_state());
       if (replica == new_leader_idx) {
         ASSERT_EQ(RaftPeerPB::LEADER, role)
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 34a41ed..8f2d1f9 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -4843,13 +4843,14 @@ Status CatalogManager::BuildLocationsForTablet(
       dimension = l_tablet.data().pb.dimension_label();
     }
     if (ts_infos_dict) {
-      int idx = *ComputeIfAbsent(
+      int idx = *ComputePairIfAbsent(
           &ts_infos_dict->uuid_to_idx, peer.permanent_uuid(),
-          [&]() -> int {
+          [&]() -> pair<StringPiece, int> {
+            auto& ts_info_pbs = ts_infos_dict->ts_info_pbs;
             auto pb = make_tsinfo_pb();
-            int idx = ts_infos_dict->ts_info_pbs.size();
-            ts_infos_dict->ts_info_pbs.emplace_back(pb.release());
-            return idx;
+            int ts_info_idx = ts_info_pbs.size();
+            ts_info_pbs.emplace_back(pb.release());
+            return { ts_info_pbs.back()->permanent_uuid(), ts_info_idx };
           });
 
       auto* interned_replica_pb = locs_pb->add_interned_replicas();
diff --git a/src/kudu/util/map-util-test.cc b/src/kudu/util/map-util-test.cc
index 78a3484..51b428e 100644
--- a/src/kudu/util/map-util-test.cc
+++ b/src/kudu/util/map-util-test.cc
@@ -22,14 +22,22 @@
 #include <map>
 #include <memory>
 #include <string>
+#include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include <gtest/gtest.h>
 
+#include "kudu/gutil/strings/stringpiece.h"
+
+template <class X> struct GoodFastHash;
+
 using std::map;
 using std::string;
 using std::shared_ptr;
 using std::unique_ptr;
+using std::unordered_map;
+using std::vector;
 
 namespace kudu {
 
@@ -69,6 +77,34 @@ TEST(ComputeIfAbsentTest, TestComputeIfAbsentAndReturnAbsense) {
   ASSERT_EQ(*result2.first, "hello_world");
 }
 
+namespace {
+// Simple struct to act as a container for a string. While not necessary per
+// se, this is more representative of the expected usage of
+// ComputePairIfAbsent* (i.e. pointing to internal state of more complex
+// objects).
+struct SimpleStruct {
+  string str;
+};
+} // anonymous namespace
+
+TEST(ComputePairIfAbsentTest, TestComputePairDestructState) {
+  unordered_map<StringPiece, int, GoodFastHash<StringPiece>> string_to_idx;
+  const string kBigKey = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
+  string big_key = kBigKey;
+  vector<SimpleStruct> big_structs;
+  auto result = ComputePairIfAbsentReturnAbsense(&string_to_idx, big_key,
+      [&] () -> std::pair<StringPiece, int> {
+        int idx = big_structs.size();
+        big_structs.emplace_back(SimpleStruct({ big_key }));
+        return { big_structs.back().str, idx };
+      });
+  // Clear the original key state. This shouldn't have any effect on the map.
+  big_key.clear();
+  ASSERT_TRUE(result.second);
+  ASSERT_EQ(*result.first, FindOrDie(string_to_idx, kBigKey));
+  ASSERT_EQ(kBigKey, big_structs[*result.first].str);
+}
+
 TEST(FindPointeeOrNullTest, TestFindPointeeOrNull) {
   map<string, unique_ptr<string>> my_map;
   auto iter = my_map.emplace("key", unique_ptr<string>(new string("hello_world")));


Mime
View raw message