From commits-return-8622-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Wed Mar 18 06:45:21 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 22F5E180677 for ; Wed, 18 Mar 2020 07:45:20 +0100 (CET) Received: (qmail 87098 invoked by uid 500); 18 Mar 2020 06:45:19 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 86987 invoked by uid 99); 18 Mar 2020 06:45:19 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Mar 2020 06:45:19 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 18C948DACD; Wed, 18 Mar 2020 06:45:19 +0000 (UTC) Date: Wed, 18 Mar 2020 06:45:20 +0000 To: "commits@kudu.apache.org" Subject: [kudu] 02/03: remove kudu::Thread from tests MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: adar@apache.org In-Reply-To: <158451391892.11542.10097069180443385814@gitbox.apache.org> References: <158451391892.11542.10097069180443385814@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: kudu X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Rev: 9a0ad9569286dedc3ba699d37f95e96070b548cc X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20200318064519.18C948DACD@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. adar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git commit 9a0ad9569286dedc3ba699d37f95e96070b548cc Author: Adar Dembo AuthorDate: Sat Mar 14 02:05:16 2020 -0700 remove kudu::Thread from tests std::thread is much more ergonomic, evidenced in part by the silly kudu::Thread names and categories found in tests. I retained kudu::Thread in debug-util-test and trace-test, both of which rely on some internal Kudu threading detail. And of course in thread-test which actually exercises this functionality. Change-Id: I39b87fc3d8542143507edf79a5ca05a7fc27ffd4 Reviewed-on: http://gerrit.cloudera.org:8080/15447 Reviewed-by: Alexey Serbin Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong --- src/kudu/benchmarks/tpch/tpch_real_world.cc | 38 ++----- src/kudu/benchmarks/wal_hiccup.cc | 12 +- src/kudu/cfile/mt-bloomfile-test.cc | 28 ++--- src/kudu/client/client-test.cc | 16 ++- src/kudu/clock/hybrid_clock-test.cc | 12 +- src/kudu/consensus/mt-log-test.cc | 35 +++--- src/kudu/fs/block_manager-stress-test.cc | 31 ++--- src/kudu/fs/block_manager-test.cc | 23 ++-- src/kudu/integration-tests/alter_table-test.cc | 31 ++--- src/kudu/integration-tests/client-stress-test.cc | 36 +++--- src/kudu/integration-tests/consistency-itest.cc | 34 +++--- src/kudu/integration-tests/create-table-itest.cc | 32 +++--- .../integration-tests/exactly_once_writes-itest.cc | 55 ++++----- .../full_stack-insert-scan-test.cc | 22 ++-- src/kudu/integration-tests/linked_list-test-util.h | 16 ++- .../integration-tests/master_replication-itest.cc | 21 ++-- .../integration-tests/raft_consensus-itest-base.cc | 16 ++- .../integration-tests/raft_consensus-itest-base.h | 9 +- src/kudu/integration-tests/raft_consensus-itest.cc | 126 +++++++++------------ .../raft_consensus_election-itest.cc | 5 +- .../tablet_copy_client_session-itest.cc | 30 ++--- src/kudu/integration-tests/ts_recovery-itest.cc | 18 ++- .../update_scan_delta_compact-test.cc | 44 ++----- src/kudu/master/hms_notification_log_listener.cc | 3 +- src/kudu/mini-cluster/webui_checker.cc | 12 +- src/kudu/mini-cluster/webui_checker.h | 6 +- src/kudu/rpc/exactly_once_rpc-test.cc | 65 +++++------ src/kudu/rpc/mt-rpc-test.cc | 67 +++++------ src/kudu/rpc/rpc-test-base.h | 8 +- src/kudu/rpc/rpc-test.cc | 27 ++--- src/kudu/tablet/lock_manager-test.cc | 18 ++- src/kudu/tablet/mt-rowset_delta_compaction-test.cc | 35 ++---- src/kudu/tablet/mt-tablet-test.cc | 105 +++++++++-------- src/kudu/tablet/tablet_random_access-test.cc | 20 ++-- .../transactions/transaction_tracker-test.cc | 10 +- src/kudu/tools/ksck_remote-test.cc | 25 ++-- src/kudu/tserver/tablet_server-stress-test.cc | 30 +++-- src/kudu/tserver/tablet_server-test.cc | 21 ++-- src/kudu/util/countdown_latch-test.cc | 18 +-- src/kudu/util/maintenance_manager-test.cc | 19 ++-- src/kudu/util/mt-hdr_histogram-test.cc | 43 ++++--- src/kudu/util/mt-metrics-test.cc | 27 ++--- src/kudu/util/mt-threadlocal-test.cc | 85 +++++++------- src/kudu/util/once-test.cc | 21 ++-- src/kudu/util/pstack_watcher.cc | 1 + src/kudu/util/striped64-test.cc | 35 +++--- src/kudu/util/test_graph.cc | 10 +- src/kudu/util/test_graph.h | 11 +- 48 files changed, 619 insertions(+), 793 deletions(-) diff --git a/src/kudu/benchmarks/tpch/tpch_real_world.cc b/src/kudu/benchmarks/tpch/tpch_real_world.cc index 0731516..0c49a3c 100644 --- a/src/kudu/benchmarks/tpch/tpch_real_world.cc +++ b/src/kudu/benchmarks/tpch/tpch_real_world.cc @@ -49,6 +49,7 @@ #include #include #include +#include #include #include @@ -63,7 +64,6 @@ #include "kudu/client/row_result.h" #include "kudu/client/schema.h" #include "kudu/common/partial_row.h" -#include "kudu/gutil/ref_counted.h" #include "kudu/gutil/stl_util.h" #include "kudu/gutil/stringprintf.h" #include "kudu/gutil/strings/join.h" @@ -80,7 +80,6 @@ #include "kudu/util/status.h" #include "kudu/util/stopwatch.h" #include "kudu/util/subprocess.h" -#include "kudu/util/thread.h" DEFINE_bool(tpch_use_mini_cluster, true, "Create a mini cluster for the work to be performed against"); @@ -126,6 +125,7 @@ using kudu::client::KuduSchema; using kudu::cluster::ExternalMiniCluster; using kudu::cluster::ExternalMiniClusterOptions; using std::string; +using std::thread; using std::unique_ptr; using std::vector; using strings::Substitute; @@ -156,7 +156,7 @@ class TpchRealWorld { void WaitForRowCount(int64_t row_count); - Status Run(); + void Run(); private: static const char* kLineItemBase; @@ -385,19 +385,12 @@ void TpchRealWorld::WaitForRowCount(int64_t row_count) { } } -Status TpchRealWorld::Run() { - vector > threads; +void TpchRealWorld::Run() { + vector threads; if (FLAGS_tpch_load_data) { for (int i = 0; i < FLAGS_tpch_num_inserters; i++) { - scoped_refptr thr; - RETURN_NOT_OK(kudu::Thread::Create("test", Substitute("lineitem-gen$0", i), - &TpchRealWorld::MonitorDbgenThread, this, i, - &thr)); - threads.push_back(thr); - RETURN_NOT_OK(kudu::Thread::Create("test", Substitute("lineitem-load$0", i), - &TpchRealWorld::LoadLineItemsThread, this, i, - &thr)); - threads.push_back(thr); + threads.emplace_back([this, i]() { this->MonitorDbgenThread(i); }); + threads.emplace_back([this, i]() { this->LoadLineItemsThread(i); }); } // It takes some time for dbgen to start outputting rows so there's no need to query yet. @@ -406,11 +399,7 @@ Status TpchRealWorld::Run() { } if (FLAGS_tpch_run_queries) { - scoped_refptr thr; - RETURN_NOT_OK(kudu::Thread::Create("test", "lineitem-query", - &TpchRealWorld::RunQueriesThread, this, - &thr)); - threads.push_back(thr); + threads.emplace_back([this]() { this->RunQueriesThread(); }); } // We'll wait until all the dbgens finish or after tpch_test_runtime_sec, @@ -431,10 +420,9 @@ Status TpchRealWorld::Run() { stop_threads_.Store(true); - for (const auto& thr : threads) { - RETURN_NOT_OK(ThreadJoiner(thr.get()).Join()); + for (auto& t : threads) { + t.join(); } - return Status::OK(); } } // namespace kudu @@ -449,10 +437,6 @@ int main(int argc, char* argv[]) { std::cerr << "Couldn't initialize the benchmarking tool, reason: "<< s.ToString() << std::endl; return 1; } - s = benchmarker.Run(); - if (!s.ok()) { - std::cerr << "Couldn't run the benchmarking tool, reason: "<< s.ToString() << std::endl; - return 1; - } + benchmarker.Run(); return 0; } diff --git a/src/kudu/benchmarks/wal_hiccup.cc b/src/kudu/benchmarks/wal_hiccup.cc index 9dc3eec..03d9d6a 100644 --- a/src/kudu/benchmarks/wal_hiccup.cc +++ b/src/kudu/benchmarks/wal_hiccup.cc @@ -23,12 +23,12 @@ #include #include #include +#include #include #include #include -#include "kudu/gutil/ref_counted.h" #include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/gutil/walltime.h" @@ -37,9 +37,7 @@ #include "kudu/util/logging.h" #include "kudu/util/monotime.h" #include "kudu/util/path_util.h" -#include "kudu/util/status.h" #include "kudu/util/stopwatch.h" -#include "kudu/util/thread.h" DEFINE_int32(num_files, 40, "number of files to write"); DEFINE_int32(file_size_mb, 16, "size of each file"); @@ -65,6 +63,7 @@ DEFINE_bool(page_align_wal_writes, false, "write to the fake WAL with exactly 4KB writes to never cross pages"); using std::string; +using std::thread; using std::vector; namespace kudu { @@ -244,10 +243,7 @@ void WalHiccupBenchmarker::PrintConfig() { void WalHiccupBenchmarker::RunOnce() { finished_.Reset(1); - scoped_refptr thr; - CHECK_OK(Thread::Create("test", "wal", &WalHiccupBenchmarker::WALThread, this, &thr)); - - + thread thr([this]() { this->WALThread(); }); int fds[FLAGS_num_files]; for (int i = 0; i < FLAGS_num_files; i++) { WriteFile(strings::Substitute("file-$0", i), @@ -282,7 +278,7 @@ void WalHiccupBenchmarker::RunOnce() { LOG(INFO) << "Done closing..."; finished_.CountDown(); - thr->Join(); + thr.join(); } } // namespace kudu diff --git a/src/kudu/cfile/mt-bloomfile-test.cc b/src/kudu/cfile/mt-bloomfile-test.cc index 940be0f..954fc2b 100644 --- a/src/kudu/cfile/mt-bloomfile-test.cc +++ b/src/kudu/cfile/mt-bloomfile-test.cc @@ -15,24 +15,20 @@ // specific language governing permissions and limitations // under the License. -#include "kudu/cfile/bloomfile-test-base.h" - -#include +#include #include -#include // IWYU pragma: keep #include -#include #include -#include "kudu/gutil/ref_counted.h" -#include "kudu/gutil/strings/substitute.h" -#include "kudu/util/status.h" +#include "kudu/cfile/bloomfile-test-base.h" #include "kudu/util/test_macros.h" -#include "kudu/util/thread.h" DEFINE_int32(benchmark_num_threads, 8, "Number of threads to use for the benchmark"); +using std::thread; +using std::vector; + namespace kudu { namespace cfile { @@ -43,17 +39,13 @@ TEST_F(MTBloomFileTest, Benchmark) { NO_FATALS(WriteTestBloomFile()); ASSERT_OK(OpenBloomFile()); - std::vector > threads; - + vector threads; + threads.reserve(FLAGS_benchmark_num_threads); for (int i = 0; i < FLAGS_benchmark_num_threads; i++) { - scoped_refptr new_thread; - CHECK_OK(Thread::Create("test", strings::Substitute("t$0", i), - boost::bind(&BloomFileTestBase::ReadBenchmark, this), - &new_thread)); - threads.push_back(new_thread); + threads.emplace_back([this]() { this->ReadBenchmark(); }); } - for (scoped_refptr& t : threads) { - t->Join(); + for (auto& t : threads) { + t.join(); } } diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index a3be4b3..511ebf6 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -5570,14 +5570,12 @@ TEST_F(ClientTest, TestServerTooBusyRetry) { } bool stop = false; - vector > threads; - int t = 0; + vector threads; while (!stop) { - scoped_refptr thread; - ASSERT_OK(kudu::Thread::Create("test", Substitute("t$0", t++), - &ClientTest::CheckRowCount, this, client_table_.get(), kNumRows, - &thread)); - threads.push_back(thread); + threads.emplace_back([this]() { + this->CheckRowCount(this->client_table_.get(), kNumRows); + }); + // Don't start threads too fast - otherwise we could accumulate tens or hundreds // of threads before any of them starts their actual scans, and then it would // take a long time to join on them all eventually finishing down below. @@ -5590,8 +5588,8 @@ TEST_F(ClientTest, TestServerTooBusyRetry) { } } - for (const scoped_refptr& thread : threads) { - thread->Join(); + for (auto& t : threads) { + t.join(); } } diff --git a/src/kudu/clock/hybrid_clock-test.cc b/src/kudu/clock/hybrid_clock-test.cc index d0bd9f5..80b1d57 100644 --- a/src/kudu/clock/hybrid_clock-test.cc +++ b/src/kudu/clock/hybrid_clock-test.cc @@ -49,7 +49,6 @@ #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DECLARE_bool(inject_unsync_time_errors); DECLARE_string(time_source); @@ -311,21 +310,18 @@ void StresserThread(HybridClock* clock, AtomicBool* stop) { // Regression test for KUDU-953: if threads are updating and polling the // clock concurrently, the clock should still never run backwards. TEST_F(HybridClockTest, TestClockDoesntGoBackwardsWithUpdates) { - vector> threads; + vector threads; AtomicBool stop(false); SCOPED_CLEANUP({ stop.Store(true); - for (const auto& t : threads) { - t->Join(); + for (auto& t : threads) { + t.join(); } }); for (int i = 0; i < 4; i++) { - scoped_refptr thread; - ASSERT_OK(Thread::Create("test", "stresser", &StresserThread, &clock_, &stop, - &thread)); - threads.push_back(thread); + threads.emplace_back([&]() { StresserThread(&clock_, &stop); }); } SleepFor(MonoDelta::FromSeconds(1)); diff --git a/src/kudu/consensus/mt-log-test.cc b/src/kudu/consensus/mt-log-test.cc index 5a540d4..0620d82 100644 --- a/src/kudu/consensus/mt-log-test.cc +++ b/src/kudu/consensus/mt-log-test.cc @@ -56,7 +56,6 @@ #include "kudu/util/stopwatch.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DEFINE_int32(num_writer_threads, 4, "Number of threads writing to the log"); DEFINE_int32(num_reader_threads, 1, "Number of threads accessing the log while writes are ongoing"); @@ -67,16 +66,18 @@ DEFINE_bool(verify_log, true, "Whether to verify the log by reading it after the DECLARE_int32(log_thread_idle_threshold_ms); DECLARE_int32(log_inject_thread_lifecycle_latency_ms); -namespace kudu { -namespace log { - +using kudu::consensus::OpId; +using kudu::consensus::ReplicateRefPtr; +using kudu::consensus::ReplicateMsg; +using kudu::consensus::WRITE_OP; +using kudu::consensus::make_scoped_refptr_replicate; +using std::map; using std::shared_ptr; +using std::thread; using std::vector; -using consensus::OpId; -using consensus::ReplicateRefPtr; -using consensus::ReplicateMsg; -using consensus::WRITE_OP; -using consensus::make_scoped_refptr_replicate; + +namespace kudu { +namespace log { namespace { @@ -168,19 +169,17 @@ class MultiThreadedLogTest : public LogTestBase { void Run() { for (int i = 0; i < FLAGS_num_writer_threads; i++) { - scoped_refptr new_thread; - CHECK_OK(kudu::Thread::Create("test", "inserter", - &MultiThreadedLogTest::LogWriterThread, this, i, &new_thread)); - threads_.push_back(new_thread); + threads_.emplace_back([this, i]() { this->LogWriterThread(i); }); } // Start a thread which calls some read-only methods on the log // to check for races against writers. std::atomic stop_reader(false); - vector reader_threads; + vector reader_threads; + reader_threads.reserve(FLAGS_num_reader_threads); for (int i = 0; i < FLAGS_num_reader_threads; i++) { reader_threads.emplace_back([&]() { - std::map map; + map map; while (!stop_reader) { log_->GetReplaySizeMap(&map); log_->GetGCableDataSize(RetentionIndexes(FLAGS_num_batches_per_thread)); @@ -189,8 +188,8 @@ class MultiThreadedLogTest : public LogTestBase { } // Wait for the writers to finish. - for (scoped_refptr& thread : threads_) { - ASSERT_OK(ThreadJoiner(thread.get()).Join()); + for (auto& t : threads_) { + t.join(); } // Then stop the reader and join on it as well. @@ -224,7 +223,7 @@ class MultiThreadedLogTest : public LogTestBase { private: ThreadSafeRandom random_; simple_spinlock lock_; - vector > threads_; + vector threads_; }; TEST_F(MultiThreadedLogTest, TestAppends) { diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc index 9ea3798..5ecd850 100644 --- a/src/kudu/fs/block_manager-stress-test.cc +++ b/src/kudu/fs/block_manager-stress-test.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -58,7 +59,6 @@ #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DECLARE_bool(cache_force_single_shard); DECLARE_double(log_container_excess_space_before_cleanup_fraction); @@ -89,6 +89,7 @@ DEFINE_int32(max_open_files, 32, "Maximum size of the test's file cache"); using std::string; using std::shared_ptr; +using std::thread; using std::unique_ptr; using std::unordered_map; using std::vector; @@ -194,43 +195,33 @@ class BlockManagerStressTest : public KuduTest { SleepFor(MonoDelta::FromSeconds(secs)); LOG(INFO) << "Stopping all threads"; this->StopThreads(); - this->JoinThreads(); this->stop_latch_.Reset(1); + this->threads_.clear(); } void StartThreads() { - scoped_refptr new_thread; for (int i = 0; i < FLAGS_num_writer_threads; i++) { - CHECK_OK(Thread::Create("BlockManagerStressTest", Substitute("writer-$0", i), - &BlockManagerStressTest::WriterThread, this, &new_thread)); - threads_.push_back(new_thread); + threads_.emplace_back([this]() { this->WriterThread(); }); } for (int i = 0; i < FLAGS_num_reader_threads; i++) { - CHECK_OK(Thread::Create("BlockManagerStressTest", Substitute("reader-$0", i), - &BlockManagerStressTest::ReaderThread, this, &new_thread)); - threads_.push_back(new_thread); + threads_.emplace_back([this]() { this->ReaderThread(); }); } for (int i = 0; i < FLAGS_num_deleter_threads; i++) { - CHECK_OK(Thread::Create("BlockManagerStressTest", Substitute("deleter-$0", i), - &BlockManagerStressTest::DeleterThread, this, &new_thread)); - threads_.push_back(new_thread); + threads_.emplace_back([this]() { this->DeleterThread(); }); } } void StopThreads() { stop_latch_.CountDown(); + for (auto& t : threads_) { + t.join(); + } } bool ShouldStop(const MonoDelta& wait_time) { return stop_latch_.WaitFor(wait_time); } - void JoinThreads() { - for (const scoped_refptr& thr : threads_) { - CHECK_OK(ThreadJoiner(thr.get()).Join()); - } - } - void WriterThread(); void ReaderThread(); void DeleterThread(); @@ -277,7 +268,7 @@ class BlockManagerStressTest : public KuduTest { string test_tablet_name_; // The running threads. - vector > threads_; + vector threads_; // Some performance counters. @@ -552,7 +543,7 @@ TYPED_TEST(BlockManagerStressTest, StressTest) { FsReport report; ASSERT_OK(this->bm_->Open(&report)); ASSERT_OK(this->dd_manager_->LoadDataDirGroupFromPB(this->test_tablet_name_, - this->test_group_pb_)); + this->test_group_pb_)); ASSERT_OK(report.LogAndCheckForFatalErrors()); this->RunTest(FLAGS_test_duration_secs / kNumStarts); } diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc index 404c591..88143d3 100644 --- a/src/kudu/fs/block_manager-test.cc +++ b/src/kudu/fs/block_manager-test.cc @@ -62,11 +62,11 @@ #include "kudu/util/stopwatch.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" using google::protobuf::util::MessageDifferencer; using std::shared_ptr; using std::string; +using std::thread; using std::unique_ptr; using std::vector; using strings::Substitute; @@ -811,15 +811,14 @@ TYPED_TEST(BlockManagerTest, ConcurrentCloseReadableBlockTest) { unique_ptr reader; ASSERT_OK(this->bm_->OpenBlock(writer->id(), &reader)); - vector > threads; - for (int i = 0; i < 100; i++) { - scoped_refptr t; - ASSERT_OK(Thread::Create("test", Substitute("t$0", i), - &CloseHelper, reader.get(), &t)); - threads.push_back(t); + constexpr int kNumThreads = 100; + vector threads; + threads.reserve(kNumThreads); + for (int i = 0; i < kNumThreads; i++) { + threads.emplace_back([&reader]() { CloseHelper(reader.get()); }); } - for (const scoped_refptr& t : threads) { - t->Join(); + for (auto& t : threads) { + t.join(); } } @@ -1097,8 +1096,10 @@ TYPED_TEST(BlockManagerTest, ConcurrentCloseFinalizedWritableBlockTest) { } }; - vector threads; - for (int i = 0; i < 100; i++) { + constexpr int kNumThreads = 100; + vector threads; + threads.reserve(kNumThreads); + for (int i = 0; i < kNumThreads; i++) { threads.emplace_back(write_data); } for (auto& t : threads) { diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc index 5798f99..c3f3507 100644 --- a/src/kudu/integration-tests/alter_table-test.cc +++ b/src/kudu/integration-tests/alter_table-test.cc @@ -27,7 +27,6 @@ #include #include -#include #include #include #include @@ -70,7 +69,6 @@ #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DECLARE_bool(enable_maintenance_manager); DECLARE_int32(heartbeat_interval_ms); @@ -107,6 +105,7 @@ using std::atomic; using std::map; using std::pair; using std::string; +using std::thread; using std::unique_ptr; using std::vector; @@ -1193,20 +1192,18 @@ TEST_F(AlterTableTest, TestAlterUnderWriteLoad) { // Increase chances of a race between flush and alter. FLAGS_flush_threshold_mb = 3; - scoped_refptr writer; - CHECK_OK(Thread::Create("test", "inserter", - boost::bind(&AlterTableTest::InserterThread, this), - &writer)); + vector threads; + threads.reserve(3); + threads.emplace_back([this]() { this->InserterThread(); }); + threads.emplace_back([this]() { this->UpdaterThread(); }); + threads.emplace_back([this]() { this->ScannerThread(); }); - scoped_refptr updater; - CHECK_OK(Thread::Create("test", "updater", - boost::bind(&AlterTableTest::UpdaterThread, this), - &updater)); - - scoped_refptr scanner; - CHECK_OK(Thread::Create("test", "scanner", - boost::bind(&AlterTableTest::ScannerThread, this), - &scanner)); + SCOPED_CLEANUP({ + stop_threads_ = true; + for (auto& t : threads) { + t.join(); + } + }); // Add columns until we reach 10. for (int i = 2; i < 10; i++) { @@ -1222,10 +1219,6 @@ TEST_F(AlterTableTest, TestAlterUnderWriteLoad) { i)); } - stop_threads_ = true; - writer->Join(); - updater->Join(); - scanner->Join(); // A sanity check: the updater should have generate at least one update // given the parameters the test is running with. CHECK_GE(update_ops_cnt_, 0U); diff --git a/src/kudu/integration-tests/client-stress-test.cc b/src/kudu/integration-tests/client-stress-test.cc index 139f4a7..fbb35bb 100644 --- a/src/kudu/integration-tests/client-stress-test.cc +++ b/src/kudu/integration-tests/client-stress-test.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -35,7 +36,6 @@ #include "kudu/client/shared_ptr.h" #include "kudu/client/value.h" #include "kudu/gutil/port.h" -#include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/integration-tests/cluster_itest_util.h" @@ -49,26 +49,25 @@ #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" METRIC_DECLARE_entity(tablet); METRIC_DECLARE_counter(leader_memory_pressure_rejections); METRIC_DECLARE_counter(follower_memory_pressure_rejections); -using strings::Substitute; +using kudu::client::KuduClient; +using kudu::client::KuduScanner; +using kudu::client::KuduTable; +using kudu::cluster::ExternalMiniCluster; +using kudu::cluster::ExternalMiniClusterOptions; using std::set; using std::string; +using std::thread; using std::unique_ptr; using std::vector; +using strings::Substitute; namespace kudu { -using client::KuduClient; -using client::KuduScanner; -using client::KuduTable; -using cluster::ExternalMiniCluster; -using cluster::ExternalMiniClusterOptions; - class ClientStressTest : public KuduTest { public: virtual void SetUp() OVERRIDE { @@ -154,25 +153,22 @@ TEST_F(ClientStressTest, TestStartScans) { CHECK_OK(cluster_->CreateClient(nullptr, &client)); CountDownLatch go_latch(1); - vector > threads; - const int kNumThreads = 60; + constexpr int kNumThreads = 60; + vector threads; + threads.reserve(kNumThreads); Random rng(run); for (int i = 0; i < kNumThreads; i++) { int32_t start_key = rng.Next32(); - scoped_refptr new_thread; - CHECK_OK(kudu::Thread::Create( - "test", strings::Substitute("test-scanner-$0", i), - &ClientStressTest_TestStartScans_Test::ScannerThread, this, - client.get(), &go_latch, start_key, - &new_thread)); - threads.push_back(new_thread); + threads.emplace_back([this, client, &go_latch, start_key]() { + this->ScannerThread(client.get(), &go_latch, start_key); + }); } SleepFor(MonoDelta::FromMilliseconds(50)); go_latch.CountDown(); - for (const scoped_refptr& thr : threads) { - CHECK_OK(ThreadJoiner(thr.get()).Join()); + for (auto& t : threads) { + t.join(); } } } diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc index d6b1ca0..731d6dc 100644 --- a/src/kudu/integration-tests/consistency-itest.cc +++ b/src/kudu/integration-tests/consistency-itest.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -62,7 +63,6 @@ #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DECLARE_int32(heartbeat_interval_ms); DECLARE_int32(max_clock_sync_error_usec); @@ -80,6 +80,7 @@ using kudu::tablet::TabletReplica; using kudu::tserver::MiniTabletServer; using kudu::tserver::TabletServer; using std::string; +using std::thread; using std::vector; using std::unique_ptr; using strings::Substitute; @@ -827,30 +828,33 @@ class ScanYourWritesMultiClientsParamTest : // replicas that count the rows. The count of the rows // should never go down from the previous observed one. TEST_P(ScanYourWritesMultiClientsParamTest, Test) { + const int kNumThreads = 5; + const int rows_to_insert = 1000; + const int scans_to_perform = AllowSlowTests() ? 10 : 3; const KuduClient::ReplicaSelection sel = GetParam(); + shared_ptr client; ASSERT_OK(cluster_->CreateClient(nullptr, &client)); ASSERT_OK(CreateTable(client.get(), table_name_, 3)); for (int run = 1; run <= 3; run++) { - vector> threads; - const int kNumThreads = 5; - const int rows_to_insert = 1000; - const int scans_to_performs = AllowSlowTests() ? 10 : 3; + vector threads; for (int i = 0; i < kNumThreads; i++) { - Random first_row(rows_to_insert * kNumThreads); - scoped_refptr new_thread; - CHECK_OK(kudu::Thread::Create( - "test", strings::Substitute("test-scanner-$0", i), - &ConsistencyITest::ScannerThread, this, - sel, rows_to_insert, first_row.Next32(), - scans_to_performs, &new_thread)); - threads.push_back(new_thread); + // TODO(adar): this is broken: each call to Next32() yields the same + // value. Fixing it is non-trivial because 'first_row' is multiplied in + // various code paths, causing it to overflow an int32. It just so happens + // that the first Next32() using this particular seed generates a low + // enough value to avoid overflow. + Random rng(rows_to_insert * kNumThreads); + uint32_t first_row = rng.Next32(); + threads.emplace_back([=]() { + this->ScannerThread(sel, rows_to_insert, first_row, scans_to_perform); + }); } SleepFor(MonoDelta::FromMilliseconds(50)); - for (const scoped_refptr &thr : threads) { - CHECK_OK(ThreadJoiner(thr.get()).Join()); + for (auto& t : threads) { + t.join(); } } } diff --git a/src/kudu/integration-tests/create-table-itest.cc b/src/kudu/integration-tests/create-table-itest.cc index 5421a78..276e4fa 100644 --- a/src/kudu/integration-tests/create-table-itest.cc +++ b/src/kudu/integration-tests/create-table-itest.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -39,7 +40,6 @@ #include "kudu/common/schema.h" #include "kudu/common/wire_protocol-test-util.h" #include "kudu/gutil/mathlimits.h" -#include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/integration-tests/cluster_itest_util.h" #include "kudu/integration-tests/external_mini_cluster-itest-base.h" @@ -52,10 +52,10 @@ #include "kudu/util/atomic.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" namespace kudu { class HostPort; @@ -64,6 +64,7 @@ class HostPort; using std::multimap; using std::set; using std::string; +using std::thread; using std::unique_ptr; using std::vector; @@ -477,15 +478,22 @@ TEST_F(CreateTableITest, TestCreateTableWithDeadTServers) { .Create()); // Spin off a bunch of threads that repeatedly look up random key ranges in the table. + constexpr int kNumThreads = 16; AtomicBool quit(false); - vector> threads; - for (int i = 0; i < 16; i++) { - scoped_refptr t; - ASSERT_OK(Thread::Create("test", "lookup_thread", - &LookUpRandomKeysLoop, cluster_->master_proxy(), - kTableName, &quit, &t)); - threads.push_back(t); + vector threads; + threads.reserve(kNumThreads); + for (int i = 0; i < kNumThreads; i++) { + auto proxy = cluster_->master_proxy(); + threads.emplace_back([proxy, kTableName, &quit]() { + LookUpRandomKeysLoop(proxy, kTableName, &quit); + }); } + SCOPED_CLEANUP({ + quit.Store(true); + for (auto& t : threads) { + t.join(); + } + }); // Give the lookup threads some time to crash the master. MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(15); @@ -493,12 +501,6 @@ TEST_F(CreateTableITest, TestCreateTableWithDeadTServers) { ASSERT_TRUE(cluster_->master()->IsProcessAlive()) << "Master crashed!"; SleepFor(MonoDelta::FromMilliseconds(100)); } - - quit.Store(true); - - for (const auto& t : threads) { - t->Join(); - } } } // namespace kudu diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc b/src/kudu/integration-tests/exactly_once_writes-itest.cc index 53b19d1..f356100 100644 --- a/src/kudu/integration-tests/exactly_once_writes-itest.cc +++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -32,7 +33,6 @@ #include "kudu/common/wire_protocol.h" #include "kudu/common/wire_protocol.pb.h" #include "kudu/gutil/map-util.h" -#include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/integration-tests/cluster_itest_util.h" #include "kudu/integration-tests/log_verifier.h" @@ -44,21 +44,23 @@ #include "kudu/tserver/tserver.pb.h" #include "kudu/tserver/tserver_service.proxy.h" #include "kudu/util/barrier.h" +#include "kudu/util/countdown_latch.h" #include "kudu/util/logging.h" #include "kudu/util/monotime.h" #include "kudu/util/net/sockaddr.h" #include "kudu/util/pb_util.h" #include "kudu/util/random.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DECLARE_int32(consensus_rpc_timeout_ms); DECLARE_int32(num_replicas); DECLARE_int32(num_tablet_servers); using std::string; +using std::thread; using std::unique_ptr; using std::vector; @@ -203,40 +205,33 @@ void ExactlyOnceSemanticsITest::DoTestWritesWithExactlyOnceSemantics( const int num_threads = FLAGS_num_replicas * kNumThreadsPerReplica; vector> responses(num_threads); Barrier barrier(num_threads); - vector> threads; + CountDownLatch threads_running(num_threads); + vector threads; threads.reserve(num_threads); // Create kNumThreadsPerReplica write threads per replica. for (int i = 0; i < num_threads; i++) { - int thread_idx = i; - Sockaddr address = cluster_->tablet_server( - thread_idx % FLAGS_num_replicas)->bound_rpc_addr(); - string worker_name = strings::Substitute( - "writer-$0-$1", thread_idx, address.ToString()); - - scoped_refptr thread; - ASSERT_OK(kudu::Thread::Create( - "TestWritesWithExactlyOnceSemantics", - worker_name, - &ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses, - this, - address, - thread_idx, - num_batches, - &barrier, - &responses[i], - &thread)); - threads.emplace_back(thread); + Sockaddr address = cluster_->tablet_server(i % FLAGS_num_replicas)->bound_rpc_addr(); + auto& my_responses = responses[i]; + threads.emplace_back([this, address, i, num_batches, + &barrier, &my_responses, &threads_running]() { + this->WriteRowsAndCollectResponses(address, i, num_batches, &barrier, &my_responses); + threads_running.CountDown(); + }); } - bool done = false; - while (!done) { - done = true; - for (auto& thread : threads) { - if (ThreadJoiner(thread.get()).give_up_after_ms(0).Join().IsAborted()) { - done = false; - break; - } + auto thread_join_func = [&]() { + for (auto& t : threads) { + t.join(); + } + }; + auto thread_cleanup = MakeScopedCleanup(thread_join_func); + + while (true) { + if (!threads_running.count()) { + thread_join_func(); + thread_cleanup.cancel(); + break; } if (allow_crashes) { RestartAnyCrashedTabletServers(); diff --git a/src/kudu/integration-tests/full_stack-insert-scan-test.cc b/src/kudu/integration-tests/full_stack-insert-scan-test.cc index 98a15a0..6ea7ef9 100644 --- a/src/kudu/integration-tests/full_stack-insert-scan-test.cc +++ b/src/kudu/integration-tests/full_stack-insert-scan-test.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -42,7 +43,6 @@ #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/split.h" -#include "kudu/gutil/strings/strcat.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/master/mini_master.h" #include "kudu/mini-cluster/internal_mini_cluster.h" @@ -99,6 +99,7 @@ using kudu::client::KuduTableCreator; using kudu::cluster::InternalMiniCluster; using kudu::cluster::InternalMiniClusterOptions; using std::string; +using std::thread; using std::unique_ptr; using std::vector; using strings::Split; @@ -285,24 +286,23 @@ TEST_F(FullStackInsertScanTest, WithDiskStressTest) { } void FullStackInsertScanTest::DoConcurrentClientInserts() { - vector > threads(kNumInsertClients); + vector threads; + threads.reserve(kNumInsertClients); CountDownLatch start_latch(kNumInsertClients + 1); for (int i = 0; i < kNumInsertClients; ++i) { NO_FATALS(CreateNewClient(i)); - ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(), - StrCat(CURRENT_TEST_CASE_NAME(), "-id", i), - &FullStackInsertScanTest::InsertRows, this, - &start_latch, i, random_.Next(), &threads[i])); + uint32_t seed = random_.Next32(); + threads.emplace_back([this, &start_latch, i, seed]() { + this->InsertRows(&start_latch, i, seed); + }); start_latch.CountDown(); } LOG_TIMING(INFO, strings::Substitute("concurrent inserts ($0 rows, $1 threads)", kNumRows, kNumInsertClients)) { start_latch.CountDown(); - for (const scoped_refptr& thread : threads) { - ASSERT_OK(ThreadJoiner(thread.get()) - .warn_every_ms(15000) - .Join()); + for (auto& t : threads) { + t.join(); } } } @@ -431,7 +431,7 @@ void FullStackInsertScanTest::RandomRow(Random* rng, KuduPartialRow* row, char* buf[len] = '\0'; CHECK_OK(row->SetStringCopy(kStrCol, buf)); CHECK_OK(row->SetInt32(kInt32ColBase, id)); - CHECK_OK(row->SetInt64(kInt64ColBase, Thread::current_thread()->tid())); + CHECK_OK(row->SetInt64(kInt64ColBase, Thread::CurrentThreadId())); for (int i = 1; i < kNumIntCols; ++i) { CHECK_OK(row->SetInt32(kInt32ColBase + i, rng->Next32())); CHECK_OK(row->SetInt64(kInt64ColBase + i, rng->Next64())); diff --git a/src/kudu/integration-tests/linked_list-test-util.h b/src/kudu/integration-tests/linked_list-test-util.h index 1461a2a..b92ac2f 100644 --- a/src/kudu/integration-tests/linked_list-test-util.h +++ b/src/kudu/integration-tests/linked_list-test-util.h @@ -14,7 +14,6 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - #pragma once #include @@ -23,9 +22,12 @@ #include #include #include +#include #include #include +#include +#include #include #include "kudu/client/client-test-util.h" @@ -47,7 +49,6 @@ #include "kudu/util/monotime.h" #include "kudu/util/random.h" #include "kudu/util/stopwatch.h" -#include "kudu/util/thread.h" namespace kudu { @@ -267,16 +268,13 @@ class ScopedRowUpdater { // the lifetime of this object. explicit ScopedRowUpdater(client::KuduTable* table) : table_(table), - to_update_(kint64max) { // no limit - CHECK_OK(Thread::Create("linked_list-test", "updater", - &ScopedRowUpdater::RowUpdaterThread, this, &updater_)); + to_update_(kint64max), // no limit + updater_([this]() { this->RowUpdaterThread(); }) { } ~ScopedRowUpdater() { to_update_.Shutdown(); - if (updater_) { - updater_->Join(); - } + updater_.join(); } BlockingQueue* to_update() { return &to_update_; } @@ -300,7 +298,7 @@ class ScopedRowUpdater { client::KuduTable* table_; BlockingQueue to_update_; - scoped_refptr updater_; + std::thread updater_; }; // Helper class to hold results from a linked list scan and perform the diff --git a/src/kudu/integration-tests/master_replication-itest.cc b/src/kudu/integration-tests/master_replication-itest.cc index d61ae91..cf6ed9c 100644 --- a/src/kudu/integration-tests/master_replication-itest.cc +++ b/src/kudu/integration-tests/master_replication-itest.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -53,10 +54,10 @@ #include "kudu/util/net/net_util.h" #include "kudu/util/net/sockaddr.h" #include "kudu/util/pb_util.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DECLARE_bool(raft_prepare_replacement_before_eviction); @@ -77,6 +78,7 @@ using kudu::cluster::InternalMiniClusterOptions; using kudu::consensus::ReplicaManagementInfoPB; using kudu::itest::GetInt64Metric; using std::string; +using std::thread; using std::unique_ptr; using std::vector; using strings::Substitute; @@ -130,13 +132,11 @@ class MasterReplicationTest : public KuduTest { Status ConnectToClusterDuringStartup(const vector& master_addrs) { // Shut the cluster down and ... cluster_->Shutdown(); - // ... start the cluster after a delay. - scoped_refptr start_thread; - RETURN_NOT_OK(Thread::Create("TestCycleThroughAllMasters", "start_thread", - &MasterReplicationTest::StartClusterDelayed, - this, - 1000, // start after 1000 millis. - &start_thread)); + // ... start the cluster after a 1000 ms delay. + thread start_thread([this]() { this->StartClusterDelayed(1000); }); + SCOPED_CLEANUP({ + start_thread.join(); + }); // The timeouts for both RPCs and operations are increased to cope with slow // clusters (i.e. TSAN builds). @@ -145,10 +145,7 @@ class MasterReplicationTest : public KuduTest { builder.master_server_addrs(master_addrs); builder.default_admin_operation_timeout(MonoDelta::FromSeconds(90)); builder.default_rpc_timeout(MonoDelta::FromSeconds(15)); - Status s = builder.Build(&client); - - RETURN_NOT_OK(ThreadJoiner(start_thread.get()).Join()); - return s; + return builder.Build(&client); } Status CreateClient(shared_ptr* out) { diff --git a/src/kudu/integration-tests/raft_consensus-itest-base.cc b/src/kudu/integration-tests/raft_consensus-itest-base.cc index fe238c7..73d2613 100644 --- a/src/kudu/integration-tests/raft_consensus-itest-base.cc +++ b/src/kudu/integration-tests/raft_consensus-itest-base.cc @@ -62,10 +62,10 @@ DEFINE_int32(num_client_threads, 8, "Number of client threads to launch"); -DEFINE_int64(client_inserts_per_thread, 50, +DEFINE_int32(client_inserts_per_thread, 50, "Number of rows inserted by each client thread"); DECLARE_int32(consensus_rpc_timeout_ms); -DEFINE_int64(client_num_batches_per_thread, 5, +DEFINE_int32(client_num_batches_per_thread, 5, "In how many batches to group the rows, for each client"); METRIC_DECLARE_entity(tablet); @@ -136,10 +136,8 @@ void RaftConsensusITestBase::ScanReplica(TabletServerServiceProxy* replica_proxy } void RaftConsensusITestBase::InsertTestRowsRemoteThread( - uint64_t first_row, - uint64_t count, - uint64_t num_batches, - const vector& latches) { + int first_row, int count, int num_batches, + const vector>& latches) { shared_ptr table; CHECK_OK(client_->OpenTable(kTableId, &table)); @@ -148,8 +146,8 @@ void RaftConsensusITestBase::InsertTestRowsRemoteThread( CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); for (int i = 0; i < num_batches; i++) { - uint64_t first_row_in_batch = first_row + (i * count / num_batches); - uint64_t last_row_in_batch = first_row_in_batch + count / num_batches; + int first_row_in_batch = first_row + (i * count / num_batches); + int last_row_in_batch = first_row_in_batch + count / num_batches; for (int j = first_row_in_batch; j < last_row_in_batch; j++) { unique_ptr insert(table->NewInsert()); @@ -163,7 +161,7 @@ void RaftConsensusITestBase::InsertTestRowsRemoteThread( FlushSessionOrDie(session); int inserted = last_row_in_batch - first_row_in_batch; - for (CountDownLatch* latch : latches) { + for (const auto& latch : latches) { latch->CountDown(inserted); } } diff --git a/src/kudu/integration-tests/raft_consensus-itest-base.h b/src/kudu/integration-tests/raft_consensus-itest-base.h index b3c843d..dff366f 100644 --- a/src/kudu/integration-tests/raft_consensus-itest-base.h +++ b/src/kudu/integration-tests/raft_consensus-itest-base.h @@ -14,10 +14,10 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - #pragma once #include +#include #include #include @@ -57,10 +57,9 @@ class RaftConsensusITestBase : public TabletServerIntegrationTestBase { void ScanReplica(TabletServerServiceProxy* replica_proxy, std::vector* results); - void InsertTestRowsRemoteThread(uint64_t first_row, - uint64_t count, - uint64_t num_batches, - const std::vector& latches); + void InsertTestRowsRemoteThread( + int first_row, int count, int num_batches, + const std::vector>& latches = {}); protected: // Retrieve the current term of the first tablet on this tablet server. static Status GetTermMetricValue(cluster::ExternalTabletServer* ts, diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc index 8c22508..b52c8e6 100644 --- a/src/kudu/integration-tests/raft_consensus-itest.cc +++ b/src/kudu/integration-tests/raft_consensus-itest.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -47,7 +48,6 @@ #include "kudu/gutil/basictypes.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/ref_counted.h" -#include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/strcat.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/gutil/strings/util.h" @@ -82,11 +82,10 @@ #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DECLARE_bool(raft_prepare_replacement_before_eviction); -DECLARE_int64(client_inserts_per_thread); -DECLARE_int64(client_num_batches_per_thread); +DECLARE_int32(client_inserts_per_thread); +DECLARE_int32(client_num_batches_per_thread); DECLARE_int32(consensus_rpc_timeout_ms); DECLARE_int32(num_client_threads); DECLARE_int32(num_replicas); @@ -146,6 +145,7 @@ using kudu::rpc::RpcController; using kudu::server::SetFlagRequestPB; using kudu::server::SetFlagResponsePB; using std::string; +using std::thread; using std::unique_ptr; using std::unordered_map; using std::vector; @@ -229,7 +229,7 @@ class RaftConsensusITest : public RaftConsensusITestBase { protected: shared_ptr table_; - vector > threads_; + vector threads_; }; int64_t RaftConsensusITest::GetTimestampOnServer(TServerDetails* tserver) const { @@ -676,8 +676,7 @@ TEST_F(RaftConsensusITest, TestInsertAndMutateThroughConsensus) { for (int i = 0; i < num_iters; i++) { InsertTestRowsRemoteThread(i * FLAGS_client_inserts_per_thread, FLAGS_client_inserts_per_thread, - FLAGS_client_num_batches_per_thread, - vector()); + FLAGS_client_num_batches_per_thread); } NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters)); } @@ -738,27 +737,20 @@ TEST_F(RaftConsensusITest, MultiThreadedMutateAndInsertThroughConsensus) { int num_threads = FLAGS_num_client_threads; for (int i = 0; i < num_threads; i++) { - scoped_refptr new_thread; - CHECK_OK(kudu::Thread::Create("test", Substitute("ts-test$0", i), - &RaftConsensusITest::InsertTestRowsRemoteThread, - this, i * FLAGS_client_inserts_per_thread, - FLAGS_client_inserts_per_thread, - FLAGS_client_num_batches_per_thread, - vector(), - &new_thread)); - threads_.push_back(new_thread); + threads_.emplace_back([this, i]() { + this->InsertTestRowsRemoteThread(i * FLAGS_client_inserts_per_thread, + FLAGS_client_inserts_per_thread, + FLAGS_client_num_batches_per_thread); + }); } for (int i = 0; i < FLAGS_num_replicas; i++) { - scoped_refptr new_thread; - CHECK_OK(kudu::Thread::Create("test", Substitute("chaos-test$0", i), - &RaftConsensusITest::DelayInjectorThread, - this, cluster_->tablet_server(i), - kConsensusRpcTimeoutForTests, - &new_thread)); - threads_.push_back(new_thread); + auto* ts = cluster_->tablet_server(i); + threads_.emplace_back([this, ts]() { + this->DelayInjectorThread(ts, kConsensusRpcTimeoutForTests); + }); } - for (scoped_refptr thr : threads_) { - CHECK_OK(ThreadJoiner(thr.get()).Join()); + for (auto& t : threads_) { + t.join(); } NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads)); @@ -1010,39 +1002,35 @@ TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) { Substitute("$0", (FLAGS_client_num_batches_per_thread * 100))); int num_threads = FLAGS_num_client_threads; - int64_t total_num_rows = num_threads * FLAGS_client_inserts_per_thread; + int total_num_rows = num_threads * FLAGS_client_inserts_per_thread; // We create 2 * (kNumReplicas - 1) latches so that we kill the same node at least // twice. - vector latches; + vector> latches; + latches.reserve(kNumElections); for (int i = 1; i < kNumElections; i++) { - latches.push_back(new CountDownLatch((i * total_num_rows) / kNumElections)); + latches.emplace_back(new CountDownLatch((i * total_num_rows) / kNumElections)); } for (int i = 0; i < num_threads; i++) { - scoped_refptr new_thread; - CHECK_OK(kudu::Thread::Create("test", Substitute("ts-test$0", i), - &RaftConsensusITest::InsertTestRowsRemoteThread, - this, i * FLAGS_client_inserts_per_thread, - FLAGS_client_inserts_per_thread, - FLAGS_client_num_batches_per_thread, - latches, - &new_thread)); - threads_.push_back(new_thread); - } - - for (const auto* latch : latches) { + threads_.emplace_back([this, i, &latches]() { + this->InsertTestRowsRemoteThread(i * FLAGS_client_inserts_per_thread, + FLAGS_client_inserts_per_thread, + FLAGS_client_num_batches_per_thread, latches); + }); + } + + for (const auto& latch : latches) { NO_FATALS(cluster_->AssertNoCrashes()); latch->Wait(); StopOrKillLeaderAndElectNewOne(); } - for (const auto& thr : threads_) { - CHECK_OK(ThreadJoiner(thr.get()).Join()); + for (auto& t : threads_) { + t.join(); } NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads)); - STLDeleteElements(&latches); } // Regression test for KUDU-597, an issue where we could mis-order operations on @@ -1067,12 +1055,16 @@ TEST_F(RaftConsensusITest, TestKUDU_597) { AtomicBool finish(false); for (int i = 0; i < FLAGS_num_tablet_servers; i++) { - scoped_refptr new_thread; - CHECK_OK(kudu::Thread::Create("test", Substitute("ts-test$0", i), - &RaftConsensusITest::StubbornlyWriteSameRowThread, - this, i, &finish, &new_thread)); - threads_.push_back(new_thread); + threads_.emplace_back([this, i, &finish]() { + this->StubbornlyWriteSameRowThread(i, &finish); + }); } + SCOPED_CLEANUP({ + finish.Store(true); + for (auto& t : threads_) { + t.join(); + } + }); const int num_loops = AllowSlowTests() ? 10 : 1; for (int i = 0; i < num_loops; i++) { @@ -1080,11 +1072,6 @@ TEST_F(RaftConsensusITest, TestKUDU_597) { SleepFor(MonoDelta::FromSeconds(1)); ASSERT_OK(CheckTabletServersAreAlive(FLAGS_num_tablet_servers)); } - - finish.Store(true); - for (scoped_refptr thr : threads_) { - CHECK_OK(ThreadJoiner(thr.get()).Join()); - } } // Regression test for KUDU-1775: when a replica is restarted, and the first @@ -1551,7 +1538,7 @@ TEST_F(RaftConsensusITest, TestReplaceChangeConfigOperation) { ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[0]->uuid())->Resume()); // Insert some data and verify that it propagates to all servers. - NO_FATALS(InsertTestRowsRemoteThread(0, 10, 1, vector())); + NO_FATALS(InsertTestRowsRemoteThread(0, 10, 1)); NO_FATALS(AssertAllReplicasAgree(10)); // Try another config change. @@ -1559,7 +1546,7 @@ TEST_F(RaftConsensusITest, TestReplaceChangeConfigOperation) { // config change didn't properly unset the 'pending' configuration. ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tservers[2], MonoDelta::FromSeconds(5), -1)); - NO_FATALS(InsertTestRowsRemoteThread(10, 10, 1, vector())); + NO_FATALS(InsertTestRowsRemoteThread(10, 10, 1)); } // Test the atomic CAS arguments to ChangeConfig() add server and remove server. @@ -1675,30 +1662,19 @@ TEST_F(RaftConsensusITest, TestConfigChangeUnderLoad) { // Start a write workload. LOG(INFO) << "Starting write workload..."; - vector> threads; + vector threads; + threads.reserve(FLAGS_num_client_threads); AtomicInt rows_inserted(0); AtomicBool finish(false); - const auto num_threads = FLAGS_num_client_threads; - for (auto i = 0; i < num_threads; i++) { - scoped_refptr thread; - ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(), Substitute("row-writer-$0", i), - &DoWriteTestRows, - leader_tserver, tablet_id_, kTimeout, - &rows_inserted, &finish, - &thread)); - threads.push_back(thread); + for (auto i = 0; i < FLAGS_num_client_threads; i++) { + threads.emplace_back([this, leader_tserver, kTimeout, &rows_inserted, &finish]() { + DoWriteTestRows(leader_tserver, this->tablet_id_, kTimeout, &rows_inserted, &finish); + }); } auto thread_join_func = [&]() { - Status ret; - for (const auto& thread : threads) { - Status s = ThreadJoiner(thread.get()).Join(); - if (!s.ok()) { - LOG(WARNING) << "failed to join thread " << thread->name() - << " : " << s.ToString(); - ret = ret.ok() ? s : ret.CloneAndAppend(s.ToString()); - } + for (auto& t : threads) { + t.join(); } - return ret; }; auto thread_joiner = MakeScopedCleanup(thread_join_func); @@ -1747,7 +1723,7 @@ TEST_F(RaftConsensusITest, TestConfigChangeUnderLoad) { LOG(INFO) << "Joining writer threads..."; finish.Store(true); - ASSERT_OK(thread_join_func()); + thread_join_func(); thread_joiner.cancel(); LOG(INFO) << "Waiting for replicas to agree..."; diff --git a/src/kudu/integration-tests/raft_consensus_election-itest.cc b/src/kudu/integration-tests/raft_consensus_election-itest.cc index c78c3f7..78fab3b 100644 --- a/src/kudu/integration-tests/raft_consensus_election-itest.cc +++ b/src/kudu/integration-tests/raft_consensus_election-itest.cc @@ -45,6 +45,7 @@ #include "kudu/tablet/metadata.pb.h" #include "kudu/tserver/tablet_server-test-base.h" #include "kudu/tserver/tserver.pb.h" +#include "kudu/util/countdown_latch.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/pb_util.h" @@ -53,8 +54,8 @@ #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -DECLARE_int64(client_inserts_per_thread); -DECLARE_int64(client_num_batches_per_thread); +DECLARE_int32(client_inserts_per_thread); +DECLARE_int32(client_num_batches_per_thread); DECLARE_int32(consensus_rpc_timeout_ms); DECLARE_int32(num_client_threads); DECLARE_int32(num_replicas); diff --git a/src/kudu/integration-tests/tablet_copy_client_session-itest.cc b/src/kudu/integration-tests/tablet_copy_client_session-itest.cc index 8b28d37..b1a1c4d 100644 --- a/src/kudu/integration-tests/tablet_copy_client_session-itest.cc +++ b/src/kudu/integration-tests/tablet_copy_client_session-itest.cc @@ -31,7 +31,6 @@ #include "kudu/common/wire_protocol.h" #include "kudu/common/wire_protocol.pb.h" -#include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/integration-tests/cluster_itest_util.h" #include "kudu/integration-tests/external_mini_cluster-itest-base.h" @@ -45,10 +44,10 @@ #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" #include "kudu/util/path_util.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" METRIC_DECLARE_gauge_uint64(tablets_num_failed); @@ -146,10 +145,10 @@ TEST_F(TabletCopyClientSessionITest, TestStartTabletCopyWhileSourceBootstrapping cluster_->tablet_server(0)->Shutdown(); const int kNumStartTabletThreads = 4; - vector> threads; + vector threads; + threads.reserve(kNumStartTabletThreads); for (int j = 0; j < kNumStartTabletThreads; j++) { - scoped_refptr t; - CHECK_OK(Thread::Create("test", "start-tablet-copy", [&]() { + threads.emplace_back([&]() { // Retry until it succeeds (we will intially race against TS 0 startup). MonoTime deadline = MonoTime::Now() + kTimeout; Status s; @@ -172,19 +171,22 @@ TEST_F(TabletCopyClientSessionITest, TestStartTabletCopyWhileSourceBootstrapping } // If we got here, we either successfully started a tablet copy or we // observed the tablet running. - }, &t)); - threads.push_back(t); + }); } - // Restart the source tablet server (TS 0). - ASSERT_OK(cluster_->tablet_server(0)->Restart()); + { + SCOPED_CLEANUP({ + for (auto& t : threads) { + t.join(); + } + }); - // Wait for one of the threads to succeed with its tablet copy and for the - // tablet to be running on TS 1. - ASSERT_OK(WaitUntilTabletRunning(ts1, tablet_id, kTimeout)); + // Restart the source tablet server (TS 0). + ASSERT_OK(cluster_->tablet_server(0)->Restart()); - for (auto& t : threads) { - t->Join(); + // Wait for one of the threads to succeed with its tablet copy and for the + // tablet to be running on TS 1. + ASSERT_OK(WaitUntilTabletRunning(ts1, tablet_id, kTimeout)); } NO_FATALS(cluster_->AssertNoCrashes()); diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc b/src/kudu/integration-tests/ts_recovery-itest.cc index f0f4e7e..6b85f67 100644 --- a/src/kudu/integration-tests/ts_recovery-itest.cc +++ b/src/kudu/integration-tests/ts_recovery-itest.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -75,7 +76,6 @@ #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" METRIC_DECLARE_gauge_uint64(tablets_num_failed); @@ -103,6 +103,7 @@ using kudu::log::LogOptions; using kudu::tablet::TabletMetadata; using kudu::tablet::TabletSuperBlockPB; using std::string; +using std::thread; using std::unique_ptr; using std::vector; using strings::Substitute; @@ -717,11 +718,9 @@ class UpdaterThreads { void Start() { CHECK(!should_run_.Load()); should_run_.Store(true); - threads_.resize(kNumThreads); - for (int i = 0; i < threads_.size(); i++) { - CHECK_OK(kudu::Thread::Create("test", "updater", - &UpdaterThreads::Run, this, - &threads_[i])); + threads_.reserve(kNumThreads); + for (int i = 0; i < kNumThreads; i++) { + threads_.emplace_back([this]() { this->Run(); }); } } @@ -730,10 +729,9 @@ class UpdaterThreads { CHECK(should_run_.Load()); should_run_.Store(false); - for (const auto& t : threads_) { - t->Join(); + for (auto& t : threads_) { + t.join(); } - threads_.clear(); } protected: @@ -760,7 +758,7 @@ class UpdaterThreads { AtomicInt* inserted_; shared_ptr client_; shared_ptr table_; - vector > threads_; + vector threads_; }; // Parameterized test which acts as a regression test for KUDU-969. diff --git a/src/kudu/integration-tests/update_scan_delta_compact-test.cc b/src/kudu/integration-tests/update_scan_delta_compact-test.cc index f16af36..7f9fdeb 100644 --- a/src/kudu/integration-tests/update_scan_delta_compact-test.cc +++ b/src/kudu/integration-tests/update_scan_delta_compact-test.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -35,8 +36,6 @@ #include "kudu/client/write_op.h" #include "kudu/common/partial_row.h" #include "kudu/gutil/port.h" -#include "kudu/gutil/ref_counted.h" -#include "kudu/gutil/strings/strcat.h" #include "kudu/master/mini_master.h" #include "kudu/mini-cluster/internal_mini_cluster.h" #include "kudu/tserver/mini_tablet_server.h" @@ -50,7 +49,6 @@ #include "kudu/util/stopwatch.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DECLARE_int32(flush_threshold_mb); DECLARE_int32(log_segment_size_mb); @@ -79,6 +77,7 @@ using kudu::client::sp::shared_ptr; using kudu::cluster::InternalMiniCluster; using kudu::cluster::InternalMiniClusterOptions; using std::string; +using std::thread; using std::unique_ptr; using std::vector; @@ -220,44 +219,17 @@ void UpdateScanDeltaCompactionTest::InsertBaseData() { } void UpdateScanDeltaCompactionTest::RunThreads() { - vector > threads; - + vector threads; CountDownLatch stop_latch(1); - { - scoped_refptr t; - ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(), - StrCat(CURRENT_TEST_CASE_NAME(), "-update"), - &UpdateScanDeltaCompactionTest::UpdateRows, this, - &stop_latch, &t)); - threads.push_back(t); - } - - { - scoped_refptr t; - ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(), - StrCat(CURRENT_TEST_CASE_NAME(), "-scan"), - &UpdateScanDeltaCompactionTest::ScanRows, this, - &stop_latch, &t)); - threads.push_back(t); - } - - { - scoped_refptr t; - ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(), - StrCat(CURRENT_TEST_CASE_NAME(), "-curl"), - &UpdateScanDeltaCompactionTest::CurlWebPages, this, - &stop_latch, &t)); - threads.push_back(t); - } + threads.emplace_back([this, &stop_latch]() { this->UpdateRows(&stop_latch); }); + threads.emplace_back([this, &stop_latch]() { this->ScanRows(&stop_latch); }); + threads.emplace_back([this, &stop_latch]() { this->CurlWebPages(&stop_latch); }); SleepFor(MonoDelta::FromSeconds(FLAGS_seconds_to_run * 1.0)); stop_latch.CountDown(); - - for (const scoped_refptr& thread : threads) { - ASSERT_OK(ThreadJoiner(thread.get()) - .warn_every_ms(500) - .Join()); + for (auto& t : threads) { + t.join(); } } diff --git a/src/kudu/master/hms_notification_log_listener.cc b/src/kudu/master/hms_notification_log_listener.cc index a401d1c..d310e40 100644 --- a/src/kudu/master/hms_notification_log_listener.cc +++ b/src/kudu/master/hms_notification_log_listener.cc @@ -133,6 +133,7 @@ void HmsNotificationLogListenerTask::RunLoop() { for (auto& cb : callback_batch) { cb.Run(s); } + callback_batch.clear(); { std::lock_guard l(lock_); @@ -152,7 +153,7 @@ void HmsNotificationLogListenerTask::RunLoop() { // Swap the current queue of callbacks, so they can be completed after // polling next iteration. - callback_batch = std::move(catch_up_callbacks_); + callback_batch.swap(catch_up_callbacks_); // Check if shutdown was signaled while waiting. if (closing_) { diff --git a/src/kudu/mini-cluster/webui_checker.cc b/src/kudu/mini-cluster/webui_checker.cc index 0d97bbb..e195093 100644 --- a/src/kudu/mini-cluster/webui_checker.cc +++ b/src/kudu/mini-cluster/webui_checker.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -34,9 +35,9 @@ #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" #include "kudu/util/status.h" -#include "kudu/util/thread.h" using std::string; +using std::thread; using std::vector; using strings::Substitute; @@ -79,16 +80,13 @@ PeriodicWebUIChecker::PeriodicWebUIChecker( std::random_device rdev; std::mt19937 gen(rdev()); std::shuffle(urls_.begin(), urls_.end(), gen); - CHECK_OK(Thread::Create("test", "webui-checker", - &PeriodicWebUIChecker::CheckThread, this, &checker_)); + checker_ = thread([this]() { this->CheckThread(); }); } PeriodicWebUIChecker::~PeriodicWebUIChecker() { + LOG(INFO) << "shutting down CURL thread"; is_running_ = false; - if (checker_) { - LOG(INFO) << "shutting down CURL thread"; - checker_->Join(); - } + checker_.join(); } void PeriodicWebUIChecker::CheckThread() { diff --git a/src/kudu/mini-cluster/webui_checker.h b/src/kudu/mini-cluster/webui_checker.h index 6127f13..f9a46ee 100644 --- a/src/kudu/mini-cluster/webui_checker.h +++ b/src/kudu/mini-cluster/webui_checker.h @@ -14,18 +14,16 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - #pragma once #include #include +#include #include -#include "kudu/gutil/ref_counted.h" #include "kudu/util/monotime.h" namespace kudu { -class Thread; namespace cluster { class ExternalMiniCluster; } // namespace cluster @@ -64,7 +62,7 @@ class PeriodicWebUIChecker { const MonoDelta period_; std::atomic is_running_; - scoped_refptr checker_; + std::thread checker_; std::vector urls_; }; diff --git a/src/kudu/rpc/exactly_once_rpc-test.cc b/src/kudu/rpc/exactly_once_rpc-test.cc index c94e89c..986ac69 100644 --- a/src/kudu/rpc/exactly_once_rpc-test.cc +++ b/src/kudu/rpc/exactly_once_rpc-test.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -50,7 +51,6 @@ #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DECLARE_int64(remember_clients_ttl_ms); DECLARE_int64(remember_responses_ttl_ms); @@ -60,6 +60,7 @@ using kudu::pb_util::SecureDebugString; using kudu::pb_util::SecureShortDebugString; using std::atomic_int; using std::shared_ptr; +using std::thread; using std::unique_ptr; using std::vector; @@ -218,33 +219,30 @@ class ExactlyOnceRpcTest : public RpcTestBase { const scoped_refptr& request_tracker, shared_ptr messenger, int value, - int server_sleep = 0) : latch_(1) { + int server_sleep = 0) : latch(1) { MonoTime now = MonoTime::Now(); now.AddDelta(MonoDelta::FromMilliseconds(10000)); - rpc_ = new CalculatorServiceRpc(server_picker, - request_tracker, - now, - std::move(messenger), - value, - &latch_, - server_sleep); + rpc = new CalculatorServiceRpc(server_picker, + request_tracker, + now, + std::move(messenger), + value, + &latch, + server_sleep); } void Start() { - CHECK_OK(kudu::Thread::Create( - "test", - "test", - &RetriableRpcExactlyOnceAdder::SleepAndSend, this, &thread)); + thr = thread([this]() { this->SleepAndSend(); }); } void SleepAndSend() { - rpc_->SendRpc(); - latch_.Wait(); + rpc->SendRpc(); + latch.Wait(); } - CountDownLatch latch_; - scoped_refptr thread; - CalculatorServiceRpc* rpc_; + CountDownLatch latch; + thread thr; + CalculatorServiceRpc* rpc; }; // An exactly once adder that sends multiple, simultaneous calls, to the server @@ -264,10 +262,7 @@ class ExactlyOnceRpcTest : public RpcTestBase { } void Start() { - CHECK_OK(kudu::Thread::Create( - "test", - "test", - &SimultaneousExactlyOnceAdder::SleepAndSend, this, &thread)); + thr = thread([this]() { this->SleepAndSend(); }); } // Sleeps the preset number of msecs before sending the call. @@ -282,7 +277,7 @@ class ExactlyOnceRpcTest : public RpcTestBase { RpcController controller; ExactlyOnceRequestPB req; ExactlyOnceResponsePB resp; - scoped_refptr thread; + thread thr; }; @@ -314,7 +309,7 @@ class ExactlyOnceRpcTest : public RpcTestBase { // This thread is used in the stress test where we're constantly running GC. // So, once we get a "success" response, it's likely that the result will be // GCed on the server side, and thus it's not safe to spuriously retry. - adder->rpc_->sometimes_retry_successful_ = false; + adder->rpc->sometimes_retry_successful_ = false; adder->SleepAndSend(); SleepFor(MonoDelta::FromMilliseconds(rand() % 10)); counter++; @@ -471,7 +466,7 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithReplicatedRpc) { count += j; } for (int j = 0; j < kNumRpcs; j++) { - CHECK_OK(ThreadJoiner(adders[j]->thread.get()).Join()); + adders[j]->thr.join(); } CheckValueMatches(count); } @@ -513,7 +508,7 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) { } uint64_t time_micros = 0; for (int j = 0; j < kNumThreads; j++) { - CHECK_OK(ThreadJoiner(adders[j]->thread.get()).Join()); + adders[j]->thr.join(); ASSERT_EQ(adders[j]->resp.current_val(), i + 1); if (time_micros == 0) { time_micros = adders[j]->resp.current_time_micros(); @@ -602,18 +597,16 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest) CHECK_OK(request_tracker_->NewSeqNo(&stubborn_req_seq_num)); ASSERT_EQ(stubborn_req_seq_num, 0); - scoped_refptr stubborn_thread; - CHECK_OK(kudu::Thread::Create( - "stubborn", "stubborn", &ExactlyOnceRpcTest::StubbornlyWriteTheSameRequestThread, - this, stubborn_req_seq_num, stubborn_run_for, &stubborn_thread)); + thread stubborn_thread([this, stubborn_req_seq_num, stubborn_run_for]() { + this->StubbornlyWriteTheSameRequestThread(stubborn_req_seq_num, stubborn_run_for); + }); - scoped_refptr write_thread; - CHECK_OK(kudu::Thread::Create( - "write", "write", &ExactlyOnceRpcTest::DoLongWritesThread, - this, writes_run_for, &write_thread)); + thread write_thread([this, writes_run_for]() { + this->DoLongWritesThread(writes_run_for); + }); - write_thread->Join(); - stubborn_thread->Join(); + write_thread.join(); + stubborn_thread.join(); // Within a few seconds, the consumption should be back to zero. // Really, this should be within 100ms, but we'll give it a bit of diff --git a/src/kudu/rpc/mt-rpc-test.cc b/src/kudu/rpc/mt-rpc-test.cc index 0245ce6..e7fde11 100644 --- a/src/kudu/rpc/mt-rpc-test.cc +++ b/src/kudu/rpc/mt-rpc-test.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -27,7 +28,6 @@ #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" -#include "kudu/gutil/strings/substitute.h" #include "kudu/rpc/acceptor_pool.h" #include "kudu/rpc/messenger.h" #include "kudu/rpc/proxy.h" @@ -43,17 +43,15 @@ #include "kudu/util/status.h" #include "kudu/util/stopwatch.h" #include "kudu/util/test_macros.h" -#include "kudu/util/thread.h" - METRIC_DECLARE_counter(rpc_connections_accepted); METRIC_DECLARE_counter(rpcs_queue_overflow); using std::string; using std::shared_ptr; +using std::thread; using std::unique_ptr; using std::vector; -using strings::Substitute; namespace kudu { namespace rpc { @@ -102,8 +100,8 @@ class MultiThreadedRpcTest : public RpcTestBase { } }; -static void AssertShutdown(kudu::Thread* thread, const Status* status) { - ASSERT_OK(ThreadJoiner(thread).warn_every_ms(500).Join()); +static void AssertShutdown(thread* thread, const Status* status) { + thread->join(); string msg = status->ToString(); ASSERT_TRUE(msg.find("Service unavailable") != string::npos || msg.find("Network error") != string::npos) @@ -117,13 +115,14 @@ TEST_F(MultiThreadedRpcTest, TestShutdownDuringService) { Sockaddr server_addr; ASSERT_OK(StartTestServer(&server_addr)); - const int kNumThreads = 4; - scoped_refptr threads[kNumThreads]; + constexpr int kNumThreads = 4; + thread threads[kNumThreads]; Status statuses[kNumThreads]; for (int i = 0; i < kNumThreads; i++) { - ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i), - &MultiThreadedRpcTest::HammerServer, this, server_addr, - GenericCalculatorService::kAddMethodName, &statuses[i], &threads[i])); + auto* my_status = &statuses[i]; + threads[i] = thread([this, server_addr, my_status]() { + this->HammerServer(server_addr, GenericCalculatorService::kAddMethodName, my_status); + }); } SleepFor(MonoDelta::FromMilliseconds(50)); @@ -134,7 +133,7 @@ TEST_F(MultiThreadedRpcTest, TestShutdownDuringService) { server_messenger_->Shutdown(); for (int i = 0; i < kNumThreads; i++) { - AssertShutdown(threads[i].get(), &statuses[i]); + AssertShutdown(&threads[i], &statuses[i]); } } @@ -148,11 +147,11 @@ TEST_F(MultiThreadedRpcTest, TestShutdownClientWhileCallsPending) { shared_ptr client_messenger; ASSERT_OK(CreateMessenger("Client", &client_messenger)); - scoped_refptr thread; Status status; - ASSERT_OK(kudu::Thread::Create("test", "test", - &MultiThreadedRpcTest::HammerServerWithMessenger, this, server_addr, - GenericCalculatorService::kAddMethodName, &status, client_messenger, &thread)); + thread thread([this, server_addr, &status, client_messenger]() { + this->HammerServerWithMessenger(server_addr, GenericCalculatorService::kAddMethodName, + &status, client_messenger); + }); // Shut down the messenger after a very brief sleep. This often will race so that the // call gets submitted to the messenger before shutdown, but the negotiation won't have @@ -162,7 +161,7 @@ TEST_F(MultiThreadedRpcTest, TestShutdownClientWhileCallsPending) { client_messenger->Shutdown(); client_messenger.reset(); - ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join()); + thread.join(); ASSERT_TRUE(status.IsAborted() || status.IsServiceUnavailable()); string msg = status.ToString(); @@ -222,13 +221,16 @@ TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) { ASSERT_OK(service_pool_->Init(n_worker_threads_)); server_messenger_->RegisterService(service_name_, service_pool_); - scoped_refptr threads[3]; - Status status[3]; + constexpr int kNumThreads = 3; + thread threads[kNumThreads]; + Status status[kNumThreads]; CountDownLatch latch(1); - for (int i = 0; i < 3; i++) { - ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i), - &MultiThreadedRpcTest::SingleCall, this, server_addr, - GenericCalculatorService::kAddMethodName, &status[i], &latch, &threads[i])); + for (int i = 0; i < kNumThreads; i++) { + auto* my_status = &status[i]; + threads[i] = thread([this, server_addr, my_status, &latch]() { + this->SingleCall(server_addr, GenericCalculatorService::kAddMethodName, + my_status, &latch); + }); } // One should immediately fail due to backpressure. The latch is only initialized @@ -240,8 +242,8 @@ TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) { service_pool_->Shutdown(); server_messenger_->Shutdown(); - for (const auto& thread : threads) { - ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join()); + for (auto& t : threads) { + t.join(); } // Verify that one error was due to backpressure. @@ -285,12 +287,11 @@ TEST_F(MultiThreadedRpcTest, TestShutdownWithIncomingConnections) { ASSERT_OK(StartTestServer(&server_addr)); // Start a number of threads which just hammer the server with TCP connections. - vector > threads; - for (int i = 0; i < 8; i++) { - scoped_refptr new_thread; - CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i), - &HammerServerWithTCPConns, server_addr, &new_thread)); - threads.push_back(new_thread); + constexpr int kNumThreads = 8; + vector threads; + threads.reserve(kNumThreads); + for (int i = 0; i < kNumThreads; i++) { + threads.emplace_back([server_addr]() { HammerServerWithTCPConns(server_addr); }); } // Sleep until the server has started to actually accept some connections from the @@ -306,8 +307,8 @@ TEST_F(MultiThreadedRpcTest, TestShutdownWithIncomingConnections) { service_pool_->Shutdown(); server_messenger_->Shutdown(); - for (scoped_refptr& t : threads) { - ASSERT_OK(ThreadJoiner(t.get()).warn_every_ms(500).Join()); + for (auto& t : threads) { + t.join(); } } diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h index 88ea808..59bfd41 100644 --- a/src/kudu/rpc/rpc-test-base.h +++ b/src/kudu/rpc/rpc-test-base.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include "kudu/gutil/walltime.h" @@ -289,11 +290,8 @@ class CalculatorService : public CalculatorServiceIf { } if (req->deferred()) { - // Spawn a new thread which does the sleep and responds later. - scoped_refptr thread; - CHECK_OK(Thread::Create("rpc-test", "deferred", - &CalculatorService::DoSleep, this, req, context, - &thread)); + std::thread t([this, req, context]() { this->DoSleep(req, context); }); + t.detach(); return; } DoSleep(req, context); diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc index cb78577..58f0f16 100644 --- a/src/kudu/rpc/rpc-test.cc +++ b/src/kudu/rpc/rpc-test.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -64,7 +65,6 @@ #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep); METRIC_DECLARE_histogram(rpc_incoming_queue_time); @@ -77,6 +77,7 @@ DECLARE_int32(tcp_keepalive_retry_count); using std::shared_ptr; using std::string; +using std::thread; using std::unique_ptr; using std::unordered_map; using std::vector; @@ -880,7 +881,7 @@ TEST_P(TestRpc, TestRpcSidecarLimits) { // value. This tests the client's ability to send the maximal message. // The server will reject the message after it has been transferred. // This test is disabled for TSAN due to high memory requirements. - std::vector rpc_max_message_values; + vector rpc_max_message_values; rpc_max_message_values.push_back(FLAGS_rpc_max_message_size); #ifndef THREAD_SANITIZER rpc_max_message_values.push_back(std::numeric_limits::max()); @@ -1002,10 +1003,8 @@ TEST_F(TestRpc, TestNegotiationTimeout) { ASSERT_OK(StartFakeServer(&listen_sock, &server_addr)); // Create another thread to accept the connection on the fake server. - scoped_refptr acceptor_thread; - ASSERT_OK(Thread::Create("test", "acceptor", - AcceptAndReadForever, &listen_sock, - &acceptor_thread)); + thread acceptor_thread([&listen_sock]() { AcceptAndReadForever(&listen_sock); }); + SCOPED_CLEANUP({ acceptor_thread.join(); }); // Set up client. shared_ptr client_messenger; @@ -1017,8 +1016,6 @@ TEST_F(TestRpc, TestNegotiationTimeout) { NO_FATALS(DoTestExpectTimeout( p, MonoDelta::FromMilliseconds(100), false, &is_negotiation_error)); EXPECT_TRUE(is_negotiation_error); - - acceptor_thread->Join(); } // Test that client calls get failed properly when the server they're connected to @@ -1470,15 +1467,15 @@ TEST_P(TestRpc, TestCancellationMultiThreads) { Slice slice(buf); // Start a bunch of threads which invoke async RPC and cancellation. - std::vector> threads; - for (int i = 0; i < 30; ++i) { - scoped_refptr rpc_thread; - ASSERT_OK(Thread::Create("test", "rpc", SendAndCancelRpcs, &p, slice, &rpc_thread)); - threads.push_back(rpc_thread); + constexpr int kNumThreads = 30; + vector threads; + threads.reserve(kNumThreads); + for (int i = 0; i < kNumThreads; ++i) { + threads.emplace_back([&p, slice]() { SendAndCancelRpcs(&p, slice); }); } // Wait for all threads to complete. - for (scoped_refptr& rpc_thread : threads) { - rpc_thread->Join(); + for (auto& t : threads) { + t.join(); } client_messenger->Shutdown(); } diff --git a/src/kudu/tablet/lock_manager-test.cc b/src/kudu/tablet/lock_manager-test.cc index 719da5f..6c57b63 100644 --- a/src/kudu/tablet/lock_manager-test.cc +++ b/src/kudu/tablet/lock_manager-test.cc @@ -15,12 +15,15 @@ // specific language governing permissions and limitations // under the License. +#include "kudu/tablet/lock_manager.h" + #include #include #include #include #include #include +#include #include #include @@ -28,18 +31,15 @@ #include #include "kudu/gutil/macros.h" -#include "kudu/gutil/ref_counted.h" #include "kudu/gutil/stringprintf.h" -#include "kudu/tablet/lock_manager.h" #include "kudu/util/env.h" #include "kudu/util/slice.h" -#include "kudu/util/status.h" #include "kudu/util/stopwatch.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" using std::shared_ptr; using std::string; +using std::thread; using std::vector; DEFINE_int32(num_test_threads, 10, "number of stress test client threads"); @@ -161,7 +161,7 @@ class LmTestThread { : manager_(manager), keys_(std::move(keys)), resources_(resources) {} void Start() { - CHECK_OK(kudu::Thread::Create("test", "test", &LmTestThread::Run, this, &thread_)); + thread_ = thread([this]() { this->Run(); }); } void Run() { @@ -187,11 +187,7 @@ class LmTestThread { } void Join() { - CHECK_OK(ThreadJoiner(thread_.get()). - warn_after_ms(1000). - warn_every_ms(5000). - Join()); - thread_ = nullptr; + thread_.join(); } private: @@ -200,7 +196,7 @@ class LmTestThread { vector keys_; const vector resources_; uint64_t tid_; - scoped_refptr thread_; + thread thread_; }; static void runPerformanceTest(const char *test_type, diff --git a/src/kudu/tablet/mt-rowset_delta_compaction-test.cc b/src/kudu/tablet/mt-rowset_delta_compaction-test.cc index 08a54df..70201e5 100644 --- a/src/kudu/tablet/mt-rowset_delta_compaction-test.cc +++ b/src/kudu/tablet/mt-rowset_delta_compaction-test.cc @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -29,19 +30,15 @@ #include "kudu/common/rowblock.h" #include "kudu/common/schema.h" #include "kudu/gutil/atomicops.h" -#include "kudu/gutil/ref_counted.h" -#include "kudu/gutil/strings/substitute.h" #include "kudu/tablet/diskrowset-test-base.h" #include "kudu/tablet/diskrowset.h" #include "kudu/tablet/rowset.h" -#include "kudu/tablet/tablet-test-util.h" #include "kudu/tablet/tablet.pb.h" #include "kudu/util/memory/arena.h" #include "kudu/util/monotime.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" enum { kDefaultNumSecondsPerThread = 1, @@ -56,6 +53,7 @@ DEFINE_int32(num_seconds_per_thread, kDefaultNumSecondsPerThread, "Minimum number of seconds each thread should work"); using std::shared_ptr; +using std::thread; using std::unique_ptr; using std::vector; @@ -129,34 +127,19 @@ class TestMultiThreadedRowSetDeltaCompaction : public TestRowSet { void StartThreads(DiskRowSet *rs) { for (int i = 0; i < FLAGS_num_update_threads; i++) { - scoped_refptr thread; - CHECK_OK(kudu::Thread::Create("test", strings::Substitute("log_writer$0", i), - &TestMultiThreadedRowSetDeltaCompaction::RowSetUpdateThread, this, rs, &thread)); - update_threads_.push_back(thread); + threads_.emplace_back([this, rs]() { this->RowSetUpdateThread(rs); }); } for (int i = 0; i < FLAGS_num_flush_threads; i++) { - scoped_refptr thread; - CHECK_OK(kudu::Thread::Create("test", strings::Substitute("delta_flush$0", i), - &TestMultiThreadedRowSetDeltaCompaction::RowSetFlushThread, this, rs, &thread)); - flush_threads_.push_back(thread); + threads_.emplace_back([this, rs]() { this->RowSetFlushThread(rs); }); } for (int i = 0; i < FLAGS_num_compaction_threads; i++) { - scoped_refptr thread; - CHECK_OK(kudu::Thread::Create("test", strings::Substitute("delta_compaction$0", i), - &TestMultiThreadedRowSetDeltaCompaction::RowSetDeltaCompactionThread, this, rs, &thread)); - compaction_threads_.push_back(thread); + threads_.emplace_back([this, rs]() { this->RowSetDeltaCompactionThread(rs); }); } } void JoinThreads() { - for (const auto& thread : update_threads_) { - ASSERT_OK(ThreadJoiner(thread.get()).Join()); - } - for (const auto& thread : flush_threads_) { - ASSERT_OK(ThreadJoiner(thread.get()).Join()); - } - for (const auto& thread : compaction_threads_) { - ASSERT_OK(ThreadJoiner(thread.get()).Join()); + for (auto& t : threads_) { + t.join(); } } @@ -192,9 +175,7 @@ class TestMultiThreadedRowSetDeltaCompaction : public TestRowSet { Atomic32 update_counter_; Atomic32 should_run_; - vector > update_threads_; - vector > flush_threads_; - vector > compaction_threads_; + vector threads_; }; static void SetupFlagsForSlowTests() { diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc index ac8a5bb..2321efe 100644 --- a/src/kudu/tablet/mt-tablet-test.cc +++ b/src/kudu/tablet/mt-tablet-test.cc @@ -17,13 +17,14 @@ #include #include +#include #include #include #include +#include #include #include -#include #include #include @@ -36,13 +37,10 @@ #include "kudu/common/rowid.h" #include "kudu/common/schema.h" #include "kudu/gutil/basictypes.h" -#include "kudu/gutil/ref_counted.h" -#include "kudu/gutil/strings/substitute.h" #include "kudu/tablet/local_tablet_writer.h" #include "kudu/tablet/rowset.h" #include "kudu/tablet/tablet-harness.h" #include "kudu/tablet/tablet-test-base.h" -#include "kudu/tablet/tablet-test-util.h" #include "kudu/tablet/tablet.h" #include "kudu/util/countdown_latch.h" #include "kudu/util/faststring.h" @@ -52,7 +50,6 @@ #include "kudu/util/stopwatch.h" #include "kudu/util/test_graph.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DECLARE_int32(tablet_history_max_age_sec); DECLARE_double(tablet_delta_store_major_compact_min_ratio); @@ -78,7 +75,9 @@ DEFINE_double(flusher_backoff, 2.0f, "Ratio to backoff the flusher thread"); DEFINE_int32(flusher_initial_frequency_ms, 30, "Number of ms to wait between flushes"); using std::shared_ptr; +using std::thread; using std::unique_ptr; +using std::vector; namespace kudu { namespace tablet { @@ -424,23 +423,19 @@ class MultiThreadedTabletTest : public TabletTestBase { } } - template - void StartThreads(int n_threads, const FunctionType &function) { + void StartThreads(int n_threads, const std::function& function) { for (int i = 0; i < n_threads; i++) { - scoped_refptr new_thread; - CHECK_OK(kudu::Thread::Create("test", strings::Substitute("test$0", i), - function, this, i, &new_thread)); - threads_.push_back(new_thread); + threads_.emplace_back([=]() { function(i); }); } } void JoinThreads() { - for (scoped_refptr thr : threads_) { - CHECK_OK(ThreadJoiner(thr.get()).Join()); + for (auto& t : threads_) { + t.join(); } } - std::vector > threads_; + vector threads_; CountDownLatch running_insert_count_; // Projection with only an int column. @@ -462,20 +457,29 @@ TYPED_TEST(MultiThreadedTabletTest, DoTestAllAtOnce) { } // Spawn a bunch of threads, each of which will do updates. - this->StartThreads(1, &TestFixture::CollectStatisticsThread); - this->StartThreads(FLAGS_num_insert_threads, &TestFixture::InsertThread); - this->StartThreads(FLAGS_num_counter_threads, &TestFixture::CountThread); - this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread); - this->StartThreads(FLAGS_num_flush_threads, &TestFixture::FlushThread); - this->StartThreads(FLAGS_num_compact_threads, &TestFixture::CompactThread); - this->StartThreads(FLAGS_num_undo_delta_gc_threads, &TestFixture::DeleteAncientUndoDeltasThread); - this->StartThreads(FLAGS_num_flush_delta_threads, &TestFixture::FlushDeltasThread); + this->StartThreads(1, [this](int i) { this->CollectStatisticsThread(i); }); + this->StartThreads(FLAGS_num_insert_threads, + [this](int i) { this->InsertThread(i); }); + this->StartThreads(FLAGS_num_counter_threads, + [this](int i) { this->CountThread(i); }); + this->StartThreads(FLAGS_num_summer_threads, + [this](int i) { this->SummerThread(i); }); + this->StartThreads(FLAGS_num_flush_threads, + [this](int i) { this->FlushThread(i); }); + this->StartThreads(FLAGS_num_compact_threads, + [this](int i) { this->CompactThread(i); }); + this->StartThreads(FLAGS_num_undo_delta_gc_threads, + [this](int i) { this->DeleteAncientUndoDeltasThread(i); }); + this->StartThreads(FLAGS_num_flush_delta_threads, + [this](int i) { this->FlushDeltasThread(i); }); this->StartThreads(FLAGS_num_minor_compact_deltas_threads, - &TestFixture::MinorCompactDeltasThread); + [this](int i) { this->MinorCompactDeltasThread(i); }); this->StartThreads(FLAGS_num_major_compact_deltas_threads, - &TestFixture::MajorCompactDeltasThread); - this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread); - this->StartThreads(FLAGS_num_updater_threads, &TestFixture::UpdateThread); + [this](int i) { this->MajorCompactDeltasThread(i); }); + this->StartThreads(FLAGS_num_slowreader_threads, + [this](int i) { this->SlowReaderThread(i); }); + this->StartThreads(FLAGS_num_updater_threads, + [this](int i) { this->UpdateThread(i); }); this->JoinThreads(); LOG_TIMING(INFO, "Summing int32 column") { uint64_t sum = this->CountSum(shared_ptr()); @@ -493,21 +497,27 @@ TYPED_TEST(MultiThreadedTabletTest, DoTestAllAtOnce) { // of DELETE/REINSERT during flushes. TYPED_TEST(MultiThreadedTabletTest, DeleteAndReinsert) { google::FlagSaver saver; - FLAGS_flusher_backoff = 1.0f; + FLAGS_flusher_backoff = 1.0F; FLAGS_flusher_initial_frequency_ms = 1; - FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01f; + FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01F; FLAGS_tablet_delta_store_minor_compact_max = 10; - this->StartThreads(1, &TestFixture::CollectStatisticsThread); - this->StartThreads(FLAGS_num_flush_threads, &TestFixture::FlushThread); - this->StartThreads(FLAGS_num_compact_threads, &TestFixture::CompactThread); - this->StartThreads(FLAGS_num_undo_delta_gc_threads, &TestFixture::DeleteAncientUndoDeltasThread); - this->StartThreads(FLAGS_num_flush_delta_threads, &TestFixture::FlushDeltasThread); + this->StartThreads(1, [this](int i) { this->CollectStatisticsThread(i); }); + this->StartThreads(FLAGS_num_flush_threads, + [this](int i) { this->FlushThread(i); }); + this->StartThreads(FLAGS_num_compact_threads, + [this](int i) { this->CompactThread(i); }); + this->StartThreads(FLAGS_num_undo_delta_gc_threads, + [this](int i) { this->DeleteAncientUndoDeltasThread(i); }); + this->StartThreads(FLAGS_num_flush_delta_threads, + [this](int i) { this->FlushDeltasThread(i); }); this->StartThreads(FLAGS_num_minor_compact_deltas_threads, - &TestFixture::MinorCompactDeltasThread); + [this](int i) { this->MinorCompactDeltasThread(i); }); this->StartThreads(FLAGS_num_major_compact_deltas_threads, - &TestFixture::MajorCompactDeltasThread); - this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread); - this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread); + [this](int i) { this->MajorCompactDeltasThread(i); }); + this->StartThreads(10, + [this](int i) { this->DeleteAndReinsertCycleThread(i); }); + this->StartThreads(10, + [this](int i) { this->StubbornlyUpdateSameRowThread(i); }); // Run very quickly in dev builds, longer in slow builds. float runtime_seconds = AllowSlowTests() ? 2 : 0.1; @@ -542,17 +552,22 @@ TYPED_TEST(MultiThreadedHybridClockTabletTest, UpdateNoMergeCompaction) { google::FlagSaver saver; FLAGS_tablet_history_max_age_sec = 0; // GC data as aggressively as possible. - FLAGS_flusher_backoff = 1.0f; + FLAGS_flusher_backoff = 1.0F; FLAGS_flusher_initial_frequency_ms = 1; - FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01f; + FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01F; FLAGS_tablet_delta_store_minor_compact_max = 10; - this->StartThreads(FLAGS_num_flush_threads, &TestFixture::FlushThread); - this->StartThreads(FLAGS_num_flush_delta_threads, &TestFixture::FlushDeltasThread); + this->StartThreads(FLAGS_num_flush_threads, + [this](int i) { this->FlushThread(i); }); + this->StartThreads(FLAGS_num_flush_delta_threads, + [this](int i) { this->FlushDeltasThread(i); }); this->StartThreads(FLAGS_num_major_compact_deltas_threads, - &TestFixture::MajorCompactDeltasThread); - this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread); - this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread); - this->StartThreads(FLAGS_num_undo_delta_gc_threads, &TestFixture::DeleteAncientUndoDeltasThread); + [this](int i) { this->MajorCompactDeltasThread(i); }); + this->StartThreads(10, + [this](int i) { this->DeleteAndReinsertCycleThread(i); }); + this->StartThreads(10, + [this](int i) { this->StubbornlyUpdateSameRowThread(i); }); + this->StartThreads(FLAGS_num_undo_delta_gc_threads, + [this](int i) { this->DeleteAncientUndoDeltasThread(i); }); // Run very quickly in dev builds, longer in slow builds. float runtime_seconds = AllowSlowTests() ? 2 : 0.1; diff --git a/src/kudu/tablet/tablet_random_access-test.cc b/src/kudu/tablet/tablet_random_access-test.cc index b02af28..6652ec8 100644 --- a/src/kudu/tablet/tablet_random_access-test.cc +++ b/src/kudu/tablet/tablet_random_access-test.cc @@ -20,9 +20,9 @@ #include #include #include +#include #include -#include // IWYU pragma: keep #include #include #include @@ -38,7 +38,6 @@ #include "kudu/common/schema.h" #include "kudu/common/wire_protocol.pb.h" #include "kudu/gutil/port.h" -#include "kudu/gutil/ref_counted.h" #include "kudu/tablet/key_value_test_schema.h" #include "kudu/tablet/local_tablet_writer.h" #include "kudu/tablet/rowset.h" @@ -47,11 +46,11 @@ #include "kudu/util/countdown_latch.h" #include "kudu/util/memory/arena.h" #include "kudu/util/monotime.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" #include "kudu/util/stopwatch.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DEFINE_int32(keyspace_size, 3000, "number of unique row keys to insert/mutate"); DEFINE_int32(runtime_seconds, 1, "number of seconds to run the test"); @@ -63,6 +62,7 @@ DECLARE_int32(deltafile_default_block_size); using boost::optional; using std::string; +using std::thread; using std::unique_ptr; using std::vector; @@ -333,14 +333,12 @@ class TestRandomAccess : public KuduTabletTest { }; TEST_F(TestRandomAccess, Test) { - scoped_refptr flush_thread; - CHECK_OK(Thread::Create("test", "flush", - boost::bind(&TestRandomAccess::BackgroundOpThread, this), - &flush_thread)); - - DoRandomBatches(); - done_.CountDown(); - flush_thread->Join(); + thread flush_thread([this]() { this->BackgroundOpThread(); }); + SCOPED_CLEANUP({ + done_.CountDown(); + flush_thread.join(); + }); + NO_FATALS(DoRandomBatches()); } diff --git a/src/kudu/tablet/transactions/transaction_tracker-test.cc b/src/kudu/tablet/transactions/transaction_tracker-test.cc index f1481a8..fd71d41 100644 --- a/src/kudu/tablet/transactions/transaction_tracker-test.cc +++ b/src/kudu/tablet/transactions/transaction_tracker-test.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -42,7 +43,6 @@ #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DECLARE_int64(tablet_transaction_memory_limit_mb); @@ -56,6 +56,7 @@ METRIC_DECLARE_counter(transaction_memory_limit_rejections); using std::pair; using std::shared_ptr; +using std::thread; using std::unique_ptr; using std::vector; @@ -174,10 +175,7 @@ void TransactionTrackerTest::RunTransactionsThread(CountDownLatch* finish_latch) // Regression test for KUDU-384 (thread safety issue with TestWaitForAllToFinish) TEST_F(TransactionTrackerTest, TestWaitForAllToFinish) { CountDownLatch finish_latch(1); - scoped_refptr thr; - CHECK_OK(Thread::Create("test", "txn-thread", - &TransactionTrackerTest::RunTransactionsThread, this, &finish_latch, - &thr)); + thread thr([this, &finish_latch]() { this->RunTransactionsThread(&finish_latch); }); // Wait for the txns to start. while (tracker_.GetNumPendingForTests() == 0) { @@ -189,7 +187,7 @@ TEST_F(TransactionTrackerTest, TestWaitForAllToFinish) { finish_latch.CountDown(); tracker_.WaitForAllToFinish(); - CHECK_OK(ThreadJoiner(thr.get()).Join()); + thr.join(); ASSERT_EQ(tracker_.GetNumPendingForTests(), 0); } diff --git a/src/kudu/tools/ksck_remote-test.cc b/src/kudu/tools/ksck_remote-test.cc index ace10e2..55fe652 100644 --- a/src/kudu/tools/ksck_remote-test.cc +++ b/src/kudu/tools/ksck_remote-test.cc @@ -21,10 +21,10 @@ #include #include #include +#include #include #include -#include #include #include #include @@ -36,7 +36,6 @@ #include "kudu/common/partial_row.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/port.h" -#include "kudu/gutil/ref_counted.h" #include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/integration-tests/data_gen_util.h" @@ -59,7 +58,6 @@ #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DECLARE_bool(checksum_scan); DECLARE_bool(consensus); @@ -88,6 +86,7 @@ using kudu::client::KuduTableCreator; using kudu::client::sp::shared_ptr; using kudu::cluster::InternalMiniCluster; using kudu::cluster::InternalMiniClusterOptions; +using std::thread; using std::string; using std::unique_ptr; using std::vector; @@ -424,20 +423,18 @@ TEST_F(RemoteKsckTest, TestChecksumSnapshot) { CountDownLatch started_writing(1); AtomicBool continue_writing(true); Promise promise; - scoped_refptr writer_thread; // Allow the checksum scan to wait for longer in case it takes a while for the // writer thread to advance safe time. FLAGS_scanner_max_wait_ms = 10000; - Thread::Create("RemoteKsckTest", "TestChecksumSnapshot", - &RemoteKsckTest::GenerateRowWritesLoop, this, - &started_writing, boost::cref(continue_writing), &promise, - &writer_thread); + thread writer_thread([&]() { + this->GenerateRowWritesLoop(&started_writing, continue_writing, &promise); + }); { SCOPED_CLEANUP({ continue_writing.Store(false); - writer_thread->Join(); + writer_thread.join(); }); started_writing.Wait(); @@ -468,20 +465,18 @@ TEST_F(RemoteKsckTest, TestChecksumSnapshotCurrentTimestamp) { CountDownLatch started_writing(1); AtomicBool continue_writing(true); Promise promise; - scoped_refptr writer_thread; // Allow the checksum scan to wait for longer in case it takes a while for the // writer thread to advance safe time. FLAGS_scanner_max_wait_ms = 10000; - Thread::Create("RemoteKsckTest", "TestChecksumSnapshotCurrentTimestamp", - &RemoteKsckTest::GenerateRowWritesLoop, this, - &started_writing, boost::cref(continue_writing), &promise, - &writer_thread); + thread writer_thread([&]() { + this->GenerateRowWritesLoop(&started_writing, continue_writing, &promise); + }); { SCOPED_CLEANUP({ continue_writing.Store(false); - writer_thread->Join(); + writer_thread.join(); }); started_writing.Wait(); diff --git a/src/kudu/tserver/tablet_server-stress-test.cc b/src/kudu/tserver/tablet_server-stress-test.cc index 95d6a54..5069d26 100644 --- a/src/kudu/tserver/tablet_server-stress-test.cc +++ b/src/kudu/tserver/tablet_server-stress-test.cc @@ -14,32 +14,27 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#include "kudu/tserver/tablet_server-test-base.h" - #include #include #include -#include #include +#include #include -#include #include #include #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" -#include "kudu/gutil/strings/substitute.h" +#include "kudu/tserver/tablet_server-test-base.h" #include "kudu/util/countdown_latch.h" #include "kudu/util/jsonwriter.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/process_memory.h" -#include "kudu/util/status.h" #include "kudu/util/stopwatch.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DEFINE_int32(runtime_secs, 10, "Maximum number of seconds to run. If the threads have not completed " @@ -61,6 +56,10 @@ METRIC_DEFINE_histogram(test, insert_latency, 10000000, 2); +using std::ostringstream; +using std::thread; +using std::vector; + namespace kudu { namespace tserver { @@ -87,16 +86,13 @@ class TSStressTest : public TabletServerTestBase { void StartThreads() { for (int i = 0; i < FLAGS_num_inserter_threads; i++) { - scoped_refptr new_thread; - CHECK_OK(kudu::Thread::Create("test", strings::Substitute("test$0", i), - &TSStressTest::InserterThread, this, i, &new_thread)); - threads_.push_back(new_thread); + threads_.emplace_back([this, i]() { this->InserterThread(i); }); } } void JoinThreads() { - for (scoped_refptr thr : threads_) { - CHECK_OK(ThreadJoiner(thr.get()).Join()); + for (auto& t : threads_) { + t.join(); } } @@ -106,7 +102,7 @@ class TSStressTest : public TabletServerTestBase { scoped_refptr histogram_; CountDownLatch start_latch_; CountDownLatch stop_latch_; - std::vector > threads_; + vector threads_; }; void TSStressTest::InserterThread(int thread_idx) { @@ -128,14 +124,14 @@ void TSStressTest::InserterThread(int thread_idx) { } TEST_F(TSStressTest, TestMTInserts) { - std::thread timeout_thread; + thread timeout_thread; StartThreads(); Stopwatch s(Stopwatch::ALL_THREADS); s.start(); // Start a thread to fire 'stop_latch_' after the prescribed number of seconds. if (FLAGS_runtime_secs > 0) { - timeout_thread = std::thread([&]() { + timeout_thread = thread([&]() { stop_latch_.WaitFor(MonoDelta::FromSeconds(FLAGS_runtime_secs)); stop_latch_.CountDown(); }); @@ -149,7 +145,7 @@ TEST_F(TSStressTest, TestMTInserts) { LOG(INFO) << "CPU efficiency: " << (num_rows / s.elapsed().user_cpu_seconds()) << " rows/cpusec"; // Generate the JSON. - std::ostringstream out; + ostringstream out; JsonWriter writer(&out, JsonWriter::PRETTY); ASSERT_OK(histogram_->WriteAsJson(&writer, MetricJsonOptions())); diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc index 30dfb43..f49475f 100644 --- a/src/kudu/tserver/tablet_server-test.cc +++ b/src/kudu/tserver/tablet_server-test.cc @@ -117,7 +117,6 @@ #include "kudu/util/stopwatch.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" #include "kudu/util/zlib.h" using google::protobuf::util::MessageDifferencer; @@ -3829,10 +3828,6 @@ class DelayFsyncLogHook : public log::LogFaultHooks { namespace { -void DeleteOneRowAsync(TabletServerTest* test) { - test->DeleteTestRowsRemote(10, 1); -} - void CompactAsync(Tablet* tablet, CountDownLatch* flush_done_latch) { CHECK_OK(tablet->Compact(Tablet::FORCE_COMPACT_ALL)); flush_done_latch->CountDown(); @@ -3865,9 +3860,7 @@ TEST_F(TabletServerTest, TestKudu120PreRequisites) { log->SetLogFaultHooksForTests(log_hook); // Now start a transaction (delete) and stop just before commit. - scoped_refptr thread1; - CHECK_OK(kudu::Thread::Create("DeleteThread", "DeleteThread", - DeleteOneRowAsync, this, &thread1)); + thread delete_thread([this]() { this->DeleteTestRowsRemote(10, 1); }); // Wait for the replicate message to arrive and continue. log_hook->Continue(); @@ -3876,13 +3869,11 @@ TEST_F(TabletServerTest, TestKudu120PreRequisites) { usleep(100* 1000); // 100 msecs // Now start a compaction before letting the commit message go through. - scoped_refptr flush_thread; + Tablet* tablet = tablet_replica_->tablet(); CountDownLatch flush_done_latch(1); - CHECK_OK(kudu::Thread::Create("CompactThread", "CompactThread", - CompactAsync, - tablet_replica_->tablet(), - &flush_done_latch, - &flush_thread)); + thread flush_thread([tablet, &flush_done_latch]() { + CompactAsync(tablet, &flush_done_latch); + }); // At this point we have both a compaction and a transaction going on. // If we allow the transaction to return before the commit message is @@ -3905,6 +3896,8 @@ TEST_F(TabletServerTest, TestKudu120PreRequisites) { log_hook->Continue(); log_hook->Continue(); flush_done_latch.Wait(); + flush_thread.join(); + delete_thread.join(); } // Test DNS resolution failure in the master heartbeater. diff --git a/src/kudu/util/countdown_latch-test.cc b/src/kudu/util/countdown_latch-test.cc index 420248d..0cb8eee 100644 --- a/src/kudu/util/countdown_latch-test.cc +++ b/src/kudu/util/countdown_latch-test.cc @@ -15,18 +15,20 @@ // specific language governing permissions and limitations // under the License. +#include "kudu/util/countdown_latch.h" + #include +#include -#include // IWYU pragma: keep #include -#include "kudu/gutil/ref_counted.h" -#include "kudu/util/countdown_latch.h" #include "kudu/util/monotime.h" #include "kudu/util/test_macros.h" -#include "kudu/util/thread.h" #include "kudu/util/threadpool.h" +using std::thread; +using std::unique_ptr; + namespace kudu { static void DecrementLatch(CountDownLatch* latch, int amount) { @@ -40,8 +42,7 @@ static void DecrementLatch(CountDownLatch* latch, int amount) { // Tests that we can decrement the latch by arbitrary amounts, as well // as 1 by one. TEST(TestCountDownLatch, TestLatch) { - - std::unique_ptr pool; + unique_ptr pool; ASSERT_OK(ThreadPoolBuilder("cdl-test").set_max_threads(1).Build(&pool)); CountDownLatch latch(1000); @@ -63,13 +64,12 @@ TEST(TestCountDownLatch, TestLatch) { // continue. TEST(TestCountDownLatch, TestResetToZero) { CountDownLatch cdl(100); - scoped_refptr t; - ASSERT_OK(Thread::Create("test", "cdl-test", &CountDownLatch::Wait, &cdl, &t)); + thread t([&]() { cdl.Wait(); }); // Sleep for a bit until it's likely the other thread is waiting on the latch. SleepFor(MonoDelta::FromMilliseconds(10)); cdl.Reset(0); - t->Join(); + t.join(); } } // namespace kudu diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc index 60d019d..9bda48c 100644 --- a/src/kudu/util/maintenance_manager-test.cc +++ b/src/kudu/util/maintenance_manager-test.cc @@ -17,19 +17,18 @@ #include "kudu/util/maintenance_manager.h" -#include - #include #include +#include #include #include #include #include #include #include +#include #include -#include // IWYU pragma: keep #include #include #include @@ -41,14 +40,13 @@ #include "kudu/util/monotime.h" #include "kudu/util/mutex.h" #include "kudu/util/scoped_cleanup.h" -#include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" using std::list; using std::shared_ptr; using std::string; +using std::thread; using strings::Substitute; METRIC_DEFINE_entity(test); @@ -239,15 +237,12 @@ TEST_F(MaintenanceManagerTest, TestRegisterUnregister) { // already registered. op1.set_remaining_runs(0); manager_->RegisterOp(&op1); - scoped_refptr thread; - CHECK_OK(Thread::Create( - "TestThread", "TestRegisterUnregister", - boost::bind(&TestMaintenanceOp::set_remaining_runs, &op1, 1), &thread)); + thread thread([&op1]() { op1.set_remaining_runs(1); }); + SCOPED_CLEANUP({ thread.join(); }); ASSERT_EVENTUALLY([&]() { - ASSERT_EQ(op1.DurationHistogram()->TotalCount(), 1); - }); + ASSERT_EQ(op1.DurationHistogram()->TotalCount(), 1); + }); manager_->UnregisterOp(&op1); - ThreadJoiner(thread.get()).Join(); } // Regression test for KUDU-1495: when an operation is being unregistered, diff --git a/src/kudu/util/mt-hdr_histogram-test.cc b/src/kudu/util/mt-hdr_histogram-test.cc index 7221644..ad77c79 100644 --- a/src/kudu/util/mt-hdr_histogram-test.cc +++ b/src/kudu/util/mt-hdr_histogram-test.cc @@ -16,26 +16,24 @@ // under the License. #include +#include +#include #include #include -#include #include -#include "kudu/gutil/ref_counted.h" -#include "kudu/gutil/stl_util.h" -#include "kudu/gutil/strings/substitute.h" #include "kudu/util/hdr_histogram.h" #include "kudu/util/monotime.h" -#include "kudu/util/status.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DEFINE_int32(histogram_test_num_threads, 16, "Number of threads to spawn for mt-hdr_histogram test"); DEFINE_uint64(histogram_test_num_increments_per_thread, 100000LU, "Number of times to call Increment() per thread in mt-hdr_histogram test"); +using std::thread; +using std::unique_ptr; using std::vector; namespace kudu { @@ -64,19 +62,19 @@ TEST_F(MtHdrHistogramTest, ConcurrentWriteTest) { HdrHistogram hist(100000LU, 3); - auto threads = new scoped_refptr[num_threads_]; + vector threads; + threads.reserve(num_threads_); for (int i = 0; i < num_threads_; i++) { - CHECK_OK(kudu::Thread::Create("test", strings::Substitute("thread-$0", i), - IncrementSameHistValue, &hist, kValue, num_times_, &threads[i])); + threads.emplace_back([this, &hist]() { + IncrementSameHistValue(&hist, kValue, this->num_times_); + }); } - for (int i = 0; i < num_threads_; i++) { - CHECK_OK(ThreadJoiner(threads[i].get()).Join()); + for (auto& t : threads) { + t.join(); } HdrHistogram snapshot(hist); ASSERT_EQ(num_threads_ * num_times_, snapshot.CountInBucketForValue(kValue)); - - delete[] threads; } // Copy while writing, then iterate to ensure copies are consistent. @@ -86,31 +84,30 @@ TEST_F(MtHdrHistogramTest, ConcurrentCopyWhileWritingTest) { HdrHistogram hist(100000LU, 3); - auto threads = new scoped_refptr[num_threads_]; + vector threads; + threads.reserve(num_threads_); for (int i = 0; i < num_threads_; i++) { - CHECK_OK(kudu::Thread::Create("test", strings::Substitute("thread-$0", i), - IncrementSameHistValue, &hist, kValue, num_times_, &threads[i])); + threads.emplace_back([this, &hist]() { + IncrementSameHistValue(&hist, kValue, this->num_times_); + }); } // This is somewhat racy but the goal is to catch this issue at least // most of the time. At the time of this writing, before fixing a bug where // the total count stored in a copied histogram may not match its internal // counts (under concurrent writes), this test fails for me on 100/100 runs. - vector snapshots; - ElementDeleter deleter(&snapshots); + vector> snapshots; for (int i = 0; i < kNumCopies; i++) { - snapshots.push_back(new HdrHistogram(hist)); + snapshots.emplace_back(new HdrHistogram(hist)); SleepFor(MonoDelta::FromMicroseconds(100)); } for (int i = 0; i < kNumCopies; i++) { snapshots[i]->MeanValue(); // Will crash if underlying iterator is inconsistent. } - for (int i = 0; i < num_threads_; i++) { - CHECK_OK(ThreadJoiner(threads[i].get()).Join()); + for (auto& t : threads) { + t.join(); } - - delete[] threads; } } // namespace kudu diff --git a/src/kudu/util/mt-metrics-test.cc b/src/kudu/util/mt-metrics-test.cc index 37fdde3..5816737 100644 --- a/src/kudu/util/mt-metrics-test.cc +++ b/src/kudu/util/mt-metrics-test.cc @@ -19,10 +19,9 @@ #include #include #include +#include #include -#include // IWYU pragma: keep -#include // IWYU pragma: keep #include #include #include @@ -36,7 +35,6 @@ #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" DEFINE_int32(mt_metrics_test_num_threads, 4, "Number of threads to spawn in mt metrics tests"); @@ -47,6 +45,7 @@ namespace kudu { using debug::ScopedLeakCheckDisabler; using std::string; +using std::thread; using std::vector; class MultiThreadedMetricsTest : public KuduTest { @@ -65,16 +64,14 @@ static void CountWithCounter(scoped_refptr counter, int num_increments) } // Helper function that spawns and then joins a bunch of threads. -static void RunWithManyThreads(boost::function* f, int num_threads) { - vector > threads; +static void RunWithManyThreads(const std::function& f, int num_threads) { + vector threads; + threads.reserve(num_threads); for (int i = 0; i < num_threads; i++) { - scoped_refptr new_thread; - CHECK_OK(kudu::Thread::Create("test", StringPrintf("thread%d", i), - *f, &new_thread)); - threads.push_back(new_thread); + threads.emplace_back([f]() { f(); }); } for (int i = 0; i < num_threads; i++) { - ASSERT_OK(ThreadJoiner(threads[i].get()).Join()); + threads[i].join(); } } @@ -87,9 +84,8 @@ TEST_F(MultiThreadedMetricsTest, CounterIncrementTest) { scoped_refptr counter = new Counter(&METRIC_test_counter); int num_threads = FLAGS_mt_metrics_test_num_threads; int num_increments = 1000; - boost::function f = - boost::bind(CountWithCounter, counter, num_increments); - RunWithManyThreads(&f, num_threads); + RunWithManyThreads([=]() { CountWithCounter(counter, num_increments); }, + num_threads); ASSERT_EQ(num_threads * num_increments, counter->value()); } @@ -121,9 +117,8 @@ TEST_F(MultiThreadedMetricsTest, AddCounterToRegistryTest) { scoped_refptr entity = METRIC_ENTITY_test_entity.Instantiate(®istry_, "my-test"); int num_threads = FLAGS_mt_metrics_test_num_threads; int num_counters = 1000; - boost::function f = - boost::bind(RegisterCounters, entity, "prefix", num_counters); - RunWithManyThreads(&f, num_threads); + RunWithManyThreads([=]() { RegisterCounters(entity, "prefix", num_counters); }, + num_threads); ASSERT_EQ(num_threads * num_counters, entity->UnsafeMetricsMapForTests().size()); } diff --git a/src/kudu/util/mt-threadlocal-test.cc b/src/kudu/util/mt-threadlocal-test.cc index 8f0b23b..edabd52 100644 --- a/src/kudu/util/mt-threadlocal-test.cc +++ b/src/kudu/util/mt-threadlocal-test.cc @@ -16,32 +16,33 @@ // under the License. #include +#include #include #include #include -#include +#include #include +#include +#include #include #include #include "kudu/gutil/macros.h" #include "kudu/gutil/map-util.h" -#include "kudu/gutil/ref_counted.h" -#include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/countdown_latch.h" #include "kudu/util/env.h" #include "kudu/util/locks.h" #include "kudu/util/monotime.h" #include "kudu/util/mutex.h" -#include "kudu/util/status.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" #include "kudu/util/threadlocal.h" #include "kudu/util/threadlocal_cache.h" using std::string; +using std::thread; +using std::unique_ptr; using std::unordered_set; using std::vector; using strings::Substitute; @@ -54,6 +55,7 @@ class ThreadLocalTest : public KuduTest {}; const int kTargetCounterVal = 1000000; class Counter; + typedef unordered_set CounterPtrSet; typedef Mutex RegistryLockType; typedef simple_spinlock CounterLockType; @@ -185,18 +187,18 @@ static uint64_t Iterate(CounterRegistry* registry, int expected_counters) { static void TestThreadLocalCounters(CounterRegistry* registry, const int num_threads) { LOG(INFO) << "Starting threads..."; - vector > threads; + vector threads; + threads.reserve(num_threads); CountDownLatch counters_ready(num_threads); CountDownLatch reader_ready(1); CountDownLatch counters_done(num_threads); CountDownLatch reader_done(1); for (int i = 0; i < num_threads; i++) { - scoped_refptr new_thread; - CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i), - &RegisterCounterAndLoopIncr, registry, &counters_ready, &reader_ready, - &counters_done, &reader_done, &new_thread)); - threads.push_back(new_thread); + threads.emplace_back([&]() { + RegisterCounterAndLoopIncr(registry, &counters_ready, &reader_ready, + &counters_done, &reader_done); + }); } // Wait for all threads to start and register their Counters. @@ -222,8 +224,8 @@ static void TestThreadLocalCounters(CounterRegistry* registry, const int num_thr reader_done.CountDown(); LOG(INFO) << "Joining & deleting threads..."; - for (scoped_refptr thread : threads) { - CHECK_OK(ThreadJoiner(thread.get()).Join()); + for (auto& t : threads) { + t.join(); } LOG(INFO) << "Done."; } @@ -241,24 +243,24 @@ TEST_F(ThreadLocalTest, TestConcurrentCounters) { // This class cannot be instantiated. The methods are all static. class ThreadLocalString { public: - static void set(std::string value); - static const std::string& get(); + static void set(string value); + static const string& get(); private: ThreadLocalString() { } - DECLARE_STATIC_THREAD_LOCAL(std::string, value_); + DECLARE_STATIC_THREAD_LOCAL(string, value_); DISALLOW_COPY_AND_ASSIGN(ThreadLocalString); }; -DEFINE_STATIC_THREAD_LOCAL(std::string, ThreadLocalString, value_); +DEFINE_STATIC_THREAD_LOCAL(string, ThreadLocalString, value_); -void ThreadLocalString::set(std::string value) { - INIT_STATIC_THREAD_LOCAL(std::string, value_); - *value_ = value; +void ThreadLocalString::set(string value) { + INIT_STATIC_THREAD_LOCAL(string, value_); + *value_ = std::move(value); } -const std::string& ThreadLocalString::get() { - INIT_STATIC_THREAD_LOCAL(std::string, value_); +const string& ThreadLocalString::get() { + INIT_STATIC_THREAD_LOCAL(string, value_); return *value_; } @@ -266,8 +268,8 @@ static void RunAndAssign(CountDownLatch* writers_ready, CountDownLatch *readers_ready, CountDownLatch *all_done, CountDownLatch *threads_exiting, - const std::string& in, - std::string* out) { + const string& in, + string* out) { writers_ready->Wait(); // Ensure it starts off as an empty string. CHECK_EQ("", ThreadLocalString::get()); @@ -282,28 +284,27 @@ static void RunAndAssign(CountDownLatch* writers_ready, TEST_F(ThreadLocalTest, TestTLSMember) { const int num_threads = 8; - vector writers_ready; - vector readers_ready; - vector out_strings; - vector > threads; - - ElementDeleter writers_deleter(&writers_ready); - ElementDeleter readers_deleter(&readers_ready); - ElementDeleter out_strings_deleter(&out_strings); + vector> writers_ready; + writers_ready.reserve(num_threads); + vector> readers_ready; + readers_ready.reserve(num_threads); + vector> out_strings; + out_strings.reserve(num_threads); + vector threads; + threads.reserve(num_threads); CountDownLatch all_done(1); CountDownLatch threads_exiting(num_threads); LOG(INFO) << "Starting threads..."; for (int i = 0; i < num_threads; i++) { - writers_ready.push_back(new CountDownLatch(1)); - readers_ready.push_back(new CountDownLatch(1)); - out_strings.push_back(new std::string()); - scoped_refptr new_thread; - CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i), - &RunAndAssign, writers_ready[i], readers_ready[i], - &all_done, &threads_exiting, Substitute("$0", i), out_strings[i], &new_thread)); - threads.push_back(new_thread); + writers_ready.emplace_back(new CountDownLatch(1)); + readers_ready.emplace_back(new CountDownLatch(1)); + out_strings.emplace_back(new string()); + threads.emplace_back([&, i]() { + RunAndAssign(writers_ready[i].get(), readers_ready[i].get(), &all_done, + &threads_exiting, Substitute("$0", i), out_strings[i].get()); + }); } // Unlatch the threads in order. @@ -324,8 +325,8 @@ TEST_F(ThreadLocalTest, TestTLSMember) { } LOG(INFO) << "Joining & deleting threads..."; - for (scoped_refptr thread : threads) { - CHECK_OK(ThreadJoiner(thread.get()).Join()); + for (auto& t : threads) { + t.join(); } } diff --git a/src/kudu/util/once-test.cc b/src/kudu/util/once-test.cc index 58ebd2c..0db8ec4 100644 --- a/src/kudu/util/once-test.cc +++ b/src/kudu/util/once-test.cc @@ -16,21 +16,19 @@ // under the License. #include +#include #include #include #include -#include "kudu/gutil/ref_counted.h" -#include "kudu/gutil/strings/substitute.h" #include "kudu/util/once.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" +using std::thread; using std::vector; -using strings::Substitute; namespace kudu { @@ -114,16 +112,15 @@ TYPED_TEST(TestOnce, KuduOnceThreadSafeTest) { // The threads will read and write to thing.once_.initted. If access to // it is not synchronized, TSAN will flag the access as data races. - vector > threads; - for (int i = 0; i < 10; i++) { - scoped_refptr t; - ASSERT_OK(Thread::Create("test", Substitute("thread $0", i), - &InitOrGetInitted, &thing, i, &t)); - threads.push_back(t); + constexpr int kNumThreads = 10; + vector threads; + threads.reserve(kNumThreads); + for (int i = 0; i < kNumThreads; i++) { + threads.emplace_back([&thing, i]() { InitOrGetInitted(&thing, i); }); } - for (const scoped_refptr& t : threads) { - t->Join(); + for (auto& t : threads) { + t.join(); } } diff --git a/src/kudu/util/pstack_watcher.cc b/src/kudu/util/pstack_watcher.cc index 2c4481a..4f67e70 100644 --- a/src/kudu/util/pstack_watcher.cc +++ b/src/kudu/util/pstack_watcher.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include diff --git a/src/kudu/util/striped64-test.cc b/src/kudu/util/striped64-test.cc index c74e165..484143b 100644 --- a/src/kudu/util/striped64-test.cc +++ b/src/kudu/util/striped64-test.cc @@ -16,26 +16,29 @@ // under the License. +#include "kudu/util/striped64.h" + #include #include +#include #include #include #include #include -#include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/atomic.h" #include "kudu/util/monotime.h" -#include "kudu/util/striped64.h" #include "kudu/util/test_util.h" -#include "kudu/util/thread.h" // These flags are used by the multi-threaded tests, can be used for microbenchmarking. DEFINE_int32(num_operations, 10*1000, "Number of operations to perform"); DEFINE_int32(num_threads, 2, "Number of worker threads"); +using std::thread; +using std::vector; + namespace kudu { // Test some basic operations @@ -57,8 +60,6 @@ TEST(Striped64Test, TestBasic) { template class MultiThreadTest { public: - typedef std::vector > thread_vec_t; - MultiThreadTest(int64_t num_operations, int64_t num_threads) : num_operations_(num_operations), num_threads_(num_threads) { @@ -79,26 +80,24 @@ class MultiThreadTest { void Run() { // Increment for (int i = 0; i < num_threads_; i++) { - scoped_refptr ref; - Thread::Create("Striped64", "Incrementer", &MultiThreadTest::IncrementerThread, this, - num_operations_, &ref); - threads_.push_back(ref); + threads_.emplace_back([this]() { + this->IncrementerThread(this->num_operations_); + }); } - for (const scoped_refptr &t : threads_) { - t->Join(); + for (auto& t : threads_) { + t.join(); } ASSERT_EQ(num_threads_*num_operations_, adder_.Value()); threads_.clear(); // Decrement back to zero for (int i = 0; i < num_threads_; i++) { - scoped_refptr ref; - Thread::Create("Striped64", "Decrementer", &MultiThreadTest::DecrementerThread, this, - num_operations_, &ref); - threads_.push_back(ref); + threads_.emplace_back([this]() { + this->DecrementerThread(this->num_operations_); + }); } - for (const scoped_refptr &t : threads_) { - t->Join(); + for (auto& t : threads_) { + t.join(); } ASSERT_EQ(0, adder_.Value()); } @@ -108,7 +107,7 @@ class MultiThreadTest { int64_t num_operations_; // This is rounded down to the nearest even number int32_t num_threads_; - thread_vec_t threads_; + vector threads_; }; // Test adder implemented by a single AtomicInt for comparison diff --git a/src/kudu/util/test_graph.cc b/src/kudu/util/test_graph.cc index 59f4d30..73b1642 100644 --- a/src/kudu/util/test_graph.cc +++ b/src/kudu/util/test_graph.cc @@ -19,20 +19,19 @@ #include #include +#include #include #include -#include "kudu/gutil/ref_counted.h" #include "kudu/gutil/stringprintf.h" #include "kudu/gutil/walltime.h" #include "kudu/util/faststring.h" #include "kudu/util/monotime.h" -#include "kudu/util/status.h" -#include "kudu/util/thread.h" using std::shared_ptr; using std::string; +using std::thread; namespace kudu { @@ -74,14 +73,13 @@ void TimeSeriesCollector::StartDumperThread() { CHECK(!started_); exit_latch_.Reset(1); started_ = true; - CHECK_OK(kudu::Thread::Create("time series", "dumper", - &TimeSeriesCollector::DumperThread, this, &dumper_thread_)); + dumper_thread_ = thread([this]() { this->DumperThread(); }); } void TimeSeriesCollector::StopDumperThread() { CHECK(started_); exit_latch_.CountDown(); - CHECK_OK(ThreadJoiner(dumper_thread_.get()).Join()); + dumper_thread_.join(); started_ = false; } diff --git a/src/kudu/util/test_graph.h b/src/kudu/util/test_graph.h index 41df430..3f1c70a 100644 --- a/src/kudu/util/test_graph.h +++ b/src/kudu/util/test_graph.h @@ -14,15 +14,14 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#ifndef KUDU_TEST_GRAPH_COLLECTOR_H -#define KUDU_TEST_GRAPH_COLLECTOR_H +#pragma once #include #include +#include #include #include -#include "kudu/gutil/ref_counted.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/walltime.h" #include "kudu/util/countdown_latch.h" @@ -31,8 +30,7 @@ namespace kudu { -class Thread; -class faststring; +class faststring; // NOLINT class TimeSeries { public: @@ -77,7 +75,7 @@ class TimeSeriesCollector { SeriesMap series_map_; mutable Mutex series_lock_; - scoped_refptr dumper_thread_; + std::thread dumper_thread_; // Latch used to stop the dumper_thread_. When the thread is started, // this is set to 1, and when the thread should exit, it is counted down. @@ -87,4 +85,3 @@ class TimeSeriesCollector { }; } // namespace kudu -#endif