Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2CD0D200C63 for ; Thu, 27 Apr 2017 05:18:24 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2B5B4160BB4; Thu, 27 Apr 2017 03:18:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 32F95160BA8 for ; Thu, 27 Apr 2017 05:18:22 +0200 (CEST) Received: (qmail 78785 invoked by uid 500); 27 Apr 2017 03:18:21 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 78762 invoked by uid 99); 27 Apr 2017 03:18:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Apr 2017 03:18:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 393F3E110B; Thu, 27 Apr 2017 03:18:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jdcryans@apache.org To: commits@kudu.apache.org Date: Thu, 27 Apr 2017 03:18:22 -0000 Message-Id: <22f07ebf32594e1bb3337c886970c6d6@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] kudu git commit: Simplify MemTracker and move process throttling elsewhere archived-at: Thu, 27 Apr 2017 03:18:24 -0000 Simplify MemTracker and move process throttling elsewhere This takes a first step towards simplifying MemTracker: - Remove the "GC function" callbacks (we never used this) - Remove the 'ExpandLimits' code which was unimplemented. - Remove the logging functionality, which we've never used as far as I can remember. - Remove soft limiting. We only used this on the root tracker, so I just moved it to a new process_memory.{h,cc} - Remove 'consumption_func' and un-tie the root tracker from the global process memory usage. Now the root tracker is a simple sum of its descendents. For a stress/benchmark I ran a 500GB YCSB with a memory limit set to 10GB. Results showed no major difference with this patch (throughput was a few percent faster but within the realm of noise). Details at [1] [1] https://docs.google.com/document/d/1dOe5-L5BWUhF-uV4-AE5hduvfUWctXizlaoSLlp38yM/edit?usp=sharing Change-Id: Id16bad7d9a29a83e820a38e9d703811391cffe90 Reviewed-on: http://gerrit.cloudera.org:8080/6620 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2be32d54 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2be32d54 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2be32d54 Branch: refs/heads/master Commit: 2be32d5441827e0914ad9b5008545003bfda4575 Parents: f03d47e Author: Todd Lipcon Authored: Wed Apr 12 17:06:38 2017 -0700 Committer: Todd Lipcon Committed: Wed Apr 26 21:25:47 2017 +0000 ---------------------------------------------------------------------- src/kudu/consensus/raft_consensus.cc | 3 +- .../integration-tests/raft_consensus-itest.cc | 8 +- src/kudu/server/default-path-handlers.cc | 24 +- src/kudu/tablet/multi_column_writer.cc | 2 +- src/kudu/tserver/tablet_service.cc | 3 +- src/kudu/util/CMakeLists.txt | 1 + src/kudu/util/maintenance_manager-test.cc | 51 ++- src/kudu/util/maintenance_manager.cc | 10 +- src/kudu/util/maintenance_manager.h | 13 +- src/kudu/util/mem_tracker-test.cc | 115 ------- src/kudu/util/mem_tracker.cc | 327 +------------------ src/kudu/util/mem_tracker.h | 153 +-------- src/kudu/util/process_memory.cc | 235 +++++++++++++ src/kudu/util/process_memory.h | 41 +++ 14 files changed, 373 insertions(+), 613 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/consensus/raft_consensus.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index 6a87719..f75057f 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -43,6 +43,7 @@ #include "kudu/util/mem_tracker.h" #include "kudu/util/metrics.h" #include "kudu/util/pb_util.h" +#include "kudu/util/process_memory.h" #include "kudu/util/random.h" #include "kudu/util/random_util.h" #include "kudu/util/threadpool.h" @@ -1188,7 +1189,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, // This request contains at least one message, and is likely to increase // our memory pressure. double capacity_pct; - if (parent_mem_tracker_->AnySoftLimitExceeded(&capacity_pct)) { + if (process_memory::SoftLimitExceeded(&capacity_pct)) { follower_memory_pressure_rejections_->Increment(); string msg = StringPrintf( "Soft memory limit exceeded (at %.2f%% of capacity)", http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/integration-tests/raft_consensus-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc index 579e3f3..0f15d84 100644 --- a/src/kudu/integration-tests/raft_consensus-itest.cc +++ b/src/kudu/integration-tests/raft_consensus-itest.cc @@ -2251,8 +2251,14 @@ TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) { master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); // Very low memory limit to ease testing. + // When using tcmalloc, we set it to 30MB, since we can get accurate process memory + // usage statistics. Otherwise, set to only 4MB, since we'll only be throttling based + // on our tracked memory. +#ifdef TCMALLOC_ENABLED + ts_flags.push_back("--memory_limit_hard_bytes=30000000"); +#else ts_flags.push_back("--memory_limit_hard_bytes=4194304"); - +#endif // Don't let transaction memory tracking get in the way. ts_flags.push_back("--tablet_transaction_memory_limit_mb=-1"); http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/server/default-path-handlers.cc ---------------------------------------------------------------------- diff --git a/src/kudu/server/default-path-handlers.cc b/src/kudu/server/default-path-handlers.cc index 4182c39..f4f9dd5 100644 --- a/src/kudu/server/default-path-handlers.cc +++ b/src/kudu/server/default-path-handlers.cc @@ -38,12 +38,12 @@ #include "kudu/util/flag_tags.h" #include "kudu/util/flags.h" #include "kudu/util/histogram.pb.h" +#include "kudu/util/jsonwriter.h" #include "kudu/util/logging.h" #include "kudu/util/mem_tracker.h" #include "kudu/util/metrics.h" -#include "kudu/util/jsonwriter.h" +#include "kudu/util/process_memory.h" -using boost::replace_all; using std::ifstream; using std::string; using std::endl; @@ -138,13 +138,29 @@ static void MemUsageHandler(const Webserver::WebRequest& req, std::ostringstream MallocExtension::instance()->GetStats(buf, 2048); // Replace new lines with
for html string tmp(buf); - replace_all(tmp, "\n", tags.line_break); + boost::replace_all(tmp, "\n", tags.line_break); (*output) << tmp << tags.end_pre_tag; #endif } // Registered to handle "/mem-trackers", and prints out to handle memory tracker information. -static void MemTrackersHandler(const Webserver::WebRequest& req, std::ostringstream* output) { +static void MemTrackersHandler(const Webserver::WebRequest& /*req*/, std::ostringstream* output) { + *output << "

Process memory usage

\n"; + *output << "\n"; + *output << Substitute(" \n", + HumanReadableNumBytes::ToString(process_memory::CurrentConsumption())); + *output << Substitute(" \n", + HumanReadableNumBytes::ToString(process_memory::HardLimit())); + *output << "
Total consumption$0
Memory limit$0
\n"; +#ifndef TCMALLOC_ENABLED + *output << R"( +
+ NOTE: This build of Kudu has not enabled tcmalloc. + The above process memory stats will be inaccurate. +
+ )"; +#endif + *output << "

Memory usage by subsystem

\n"; *output << "\n"; *output << " " http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/tablet/multi_column_writer.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/multi_column_writer.cc b/src/kudu/tablet/multi_column_writer.cc index 51d6c29..7b19df8 100644 --- a/src/kudu/tablet/multi_column_writer.cc +++ b/src/kudu/tablet/multi_column_writer.cc @@ -83,10 +83,10 @@ Status MultiColumnWriter::Open() { RETURN_NOT_OK_PREPEND(writer->Start(), "Unable to Start() writer for column " + col.ToString()); - LOG(INFO) << "Opened CFile writer for column " << col.ToString(); cfile_writers_.push_back(writer.release()); block_ids_.push_back(block_id); } + LOG(INFO) << "Opened CFile writers for " << cfile_writers_.size() << " column(s)"; return Status::OK(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/tserver/tablet_service.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index 91b288c..3fa9abb 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -57,6 +57,7 @@ #include "kudu/util/mem_tracker.h" #include "kudu/util/monotime.h" #include "kudu/util/pb_util.h" +#include "kudu/util/process_memory.h" #include "kudu/util/status.h" #include "kudu/util/status_callback.h" #include "kudu/util/trace.h" @@ -747,7 +748,7 @@ void TabletServiceImpl::Write(const WriteRequestPB* req, // Check for memory pressure; don't bother doing any additional work if we've // exceeded the limit. double capacity_pct; - if (tablet->mem_tracker()->AnySoftLimitExceeded(&capacity_pct)) { + if (process_memory::SoftLimitExceeded(&capacity_pct)) { tablet->metrics()->leader_memory_pressure_rejections->Increment(); string msg = StringPrintf( "Soft memory limit exceeded (at %.2f%% of capacity)", http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt index 7d049a9..04fe5e8 100644 --- a/src/kudu/util/CMakeLists.txt +++ b/src/kudu/util/CMakeLists.txt @@ -169,6 +169,7 @@ set(UTIL_SRCS path_util.cc pb_util.cc pb_util-internal.cc + process_memory.cc random_util.cc resettable_heartbeater.cc rolling_log.cc http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/maintenance_manager-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc index e5dcc96..98dc9ba 100644 --- a/src/kudu/util/maintenance_manager-test.cc +++ b/src/kudu/util/maintenance_manager-test.cc @@ -15,14 +15,15 @@ // specific language governing permissions and limitations // under the License. -#include -#include +#include #include #include #include +#include +#include + #include "kudu/gutil/strings/substitute.h" -#include "kudu/tablet/tablet.pb.h" #include "kudu/util/maintenance_manager.h" #include "kudu/util/mem_tracker.h" #include "kudu/util/metrics.h" @@ -53,13 +54,15 @@ static const char kFakeUuid[] = "12345"; class MaintenanceManagerTest : public KuduTest { public: void SetUp() override { - test_tracker_ = MemTracker::CreateTracker(1000, "test"); MaintenanceManager::Options options; options.num_threads = 2; options.polling_interval_ms = 1; options.history_size = kHistorySize; - options.parent_mem_tracker = test_tracker_; manager_.reset(new MaintenanceManager(options)); + manager_->set_memory_pressure_func_for_tests( + [&](double* consumption) { + return indicate_memory_pressure_.load(); + }); ASSERT_OK(manager_->Init(kFakeUuid)); } @@ -68,8 +71,8 @@ class MaintenanceManagerTest : public KuduTest { } protected: - shared_ptr test_tracker_; shared_ptr manager_; + std::atomic indicate_memory_pressure_ { false }; }; // Just create the MaintenanceManager and then shut it down, to make sure @@ -80,10 +83,9 @@ TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) { class TestMaintenanceOp : public MaintenanceOp { public: TestMaintenanceOp(const std::string& name, - IOUsage io_usage, - const shared_ptr& tracker) + IOUsage io_usage) : MaintenanceOp(name, io_usage), - consumption_(tracker, 500), + ram_anchored_(500), logs_retained_bytes_(0), perf_improvement_(0), metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_, "test")), @@ -124,7 +126,7 @@ class TestMaintenanceOp : public MaintenanceOp { virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE { std::lock_guard guard(lock_); stats->set_runnable(remaining_runs_ > 0); - stats->set_ram_anchored(consumption_.consumption()); + stats->set_ram_anchored(ram_anchored_); stats->set_logs_retained_bytes(logs_retained_bytes_); stats->set_perf_improvement(perf_improvement_); } @@ -141,7 +143,7 @@ class TestMaintenanceOp : public MaintenanceOp { void set_ram_anchored(uint64_t ram_anchored) { std::lock_guard guard(lock_); - consumption_.Reset(ram_anchored); + ram_anchored_ = ram_anchored; } void set_logs_retained_bytes(uint64_t logs_retained_bytes) { @@ -165,7 +167,7 @@ class TestMaintenanceOp : public MaintenanceOp { private: Mutex lock_; - ScopedTrackedConsumption consumption_; + uint64_t ram_anchored_; uint64_t logs_retained_bytes_; uint64_t perf_improvement_; MetricRegistry metric_registry_; @@ -187,8 +189,8 @@ class TestMaintenanceOp : public MaintenanceOp { // running and verify that UnregisterOp waits for it to finish before // proceeding. TEST_F(MaintenanceManagerTest, TestRegisterUnregister) { - TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE, test_tracker_); - op1.set_ram_anchored(1001); + TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE); + op1.set_perf_improvement(10); // Register initially with no remaining runs. We'll later enable it once it's // already registered. op1.set_remaining_runs(0); @@ -207,8 +209,8 @@ TEST_F(MaintenanceManagerTest, TestRegisterUnregister) { // Regression test for KUDU-1495: when an operation is being unregistered, // new instances of that operation should not be scheduled. TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) { - TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE, test_tracker_); - op1.set_ram_anchored(1001); + TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE); + op1.set_perf_improvement(10); // Set the op to run up to 10 times, and each time should sleep for a second. op1.set_remaining_runs(10); @@ -231,7 +233,7 @@ TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) { // Test that we'll run an operation that doesn't improve performance when memory // pressure gets high. TEST_F(MaintenanceManagerTest, TestMemoryPressure) { - TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE, test_tracker_); + TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE); op.set_ram_anchored(100); manager_->RegisterOp(&op); @@ -239,16 +241,13 @@ TEST_F(MaintenanceManagerTest, TestMemoryPressure) { SleepFor(MonoDelta::FromMilliseconds(20)); ASSERT_EQ(0, op.DurationHistogram()->TotalCount()); - // set the ram_anchored by the high mem op so high that we'll have to run it. - scoped_refptr thread; - ASSERT_OK(Thread::Create("TestThread", "MaintenanceManagerTest", - boost::bind(&TestMaintenanceOp::set_ram_anchored, &op, 1100), &thread)); + // Fake that the server is under memory pressure. + indicate_memory_pressure_ = true; AssertEventually([&]() { ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1); }); manager_->UnregisterOp(&op); - ThreadJoiner(thread.get()).Join(); } // Test that ops are prioritized correctly when we add log retention. @@ -257,15 +256,15 @@ TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) { manager_->Shutdown(); - TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE, test_tracker_); + TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE); op1.set_ram_anchored(0); op1.set_logs_retained_bytes(100 * kMB); - TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, test_tracker_); + TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE); op2.set_ram_anchored(100); op2.set_logs_retained_bytes(100 * kMB); - TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, test_tracker_); + TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE); op3.set_ram_anchored(200); op3.set_logs_retained_bytes(100 * kMB); @@ -299,7 +298,7 @@ TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) { TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) { for (int i = 0; i < 5; i++) { string name = Substitute("op$0", i); - TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE, test_tracker_); + TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE); op.set_perf_improvement(1); op.set_ram_anchored(100); manager_->RegisterOp(&op); http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/maintenance_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc index eda71e8..d2206dc 100644 --- a/src/kudu/util/maintenance_manager.cc +++ b/src/kudu/util/maintenance_manager.cc @@ -29,8 +29,8 @@ #include "kudu/util/debug/trace_logging.h" #include "kudu/util/flag_tags.h" #include "kudu/util/logging.h" -#include "kudu/util/mem_tracker.h" #include "kudu/util/metrics.h" +#include "kudu/util/process_memory.h" #include "kudu/util/random_util.h" #include "kudu/util/stopwatch.h" #include "kudu/util/thread.h" @@ -115,7 +115,6 @@ const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = { .num_threads = 0, .polling_interval_ms = 0, .history_size = 0, - .parent_mem_tracker = shared_ptr(), }; MaintenanceManager::MaintenanceManager(const Options& options) @@ -128,9 +127,8 @@ MaintenanceManager::MaintenanceManager(const Options& options) FLAGS_maintenance_manager_polling_interval_ms : options.polling_interval_ms), completed_ops_count_(0), - parent_mem_tracker_(!options.parent_mem_tracker ? - MemTracker::GetRootTracker() : options.parent_mem_tracker), - rand_(GetRandomSeed32()) { + rand_(GetRandomSeed32()), + memory_pressure_func_(&process_memory::SoftLimitExceeded) { CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_) .set_max_threads(num_threads_).Build(&thread_pool_)); uint32_t history_size = options.history_size == 0 ? @@ -364,7 +362,7 @@ MaintenanceOp* MaintenanceManager::FindBestOp() { // Look at free memory. If it is dangerously low, we must select something // that frees memory-- the op with the most anchored memory. double capacity_pct; - if (parent_mem_tracker_->AnySoftLimitExceeded(&capacity_pct)) { + if (memory_pressure_func_(&capacity_pct)) { if (!most_mem_anchored_op) { string msg = StringPrintf("we have exceeded our soft memory limit " "(current capacity is %.2f%%). However, there are no ops currently " http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/maintenance_manager.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h index 5c432a7..6070e2d 100644 --- a/src/kudu/util/maintenance_manager.h +++ b/src/kudu/util/maintenance_manager.h @@ -19,6 +19,7 @@ #include +#include #include #include #include @@ -42,7 +43,6 @@ template class AtomicGauge; class Histogram; class MaintenanceManager; -class MemTracker; class MaintenanceOpStats { public: @@ -259,7 +259,6 @@ class MaintenanceManager : public std::enable_shared_from_this parent_mem_tracker; }; explicit MaintenanceManager(const Options& options); @@ -278,6 +277,11 @@ class MaintenanceManager : public std::enable_shared_from_this f) { + std::lock_guard guard(lock_); + memory_pressure_func_ = std::move(f); + } + static const Options DEFAULT_OPTIONS; private: @@ -307,10 +311,13 @@ class MaintenanceManager : public std::enable_shared_from_this completed_ops_; int64_t completed_ops_count_; - std::shared_ptr parent_mem_tracker_; std::string server_uuid_; Random rand_; + // Function which should return true if the server is under global memory pressure. + // This is indirected for testing purposes. + std::function memory_pressure_func_; + DISALLOW_COPY_AND_ASSIGN(MaintenanceManager); }; http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/mem_tracker-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/mem_tracker-test.cc b/src/kudu/util/mem_tracker-test.cc index 11bf2a2..7e78cbe 100644 --- a/src/kudu/util/mem_tracker-test.cc +++ b/src/kudu/util/mem_tracker-test.cc @@ -24,14 +24,9 @@ #include #include -#include -#include - #include "kudu/gutil/strings/substitute.h" #include "kudu/util/test_util.h" -DECLARE_int32(memory_limit_soft_percentage); - namespace kudu { using std::equal_to; @@ -127,54 +122,6 @@ class GcFunctionHelper { MemTracker* tracker_; }; -TEST(MemTrackerTest, GcFunctions) { - shared_ptr t = MemTracker::CreateTracker(10, ""); - ASSERT_TRUE(t->has_limit()); - - t->Consume(9); - EXPECT_FALSE(t->LimitExceeded()); - - // Test TryConsume() - EXPECT_FALSE(t->TryConsume(2)); - EXPECT_EQ(t->consumption(), 9); - EXPECT_FALSE(t->LimitExceeded()); - - // Attach GcFunction that releases 1 byte - GcFunctionHelper gc_func_helper(t.get()); - t->AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper)); - EXPECT_TRUE(t->TryConsume(2)); - EXPECT_EQ(t->consumption(), 10); - EXPECT_FALSE(t->LimitExceeded()); - - // GcFunction will be called even though TryConsume() fails - EXPECT_FALSE(t->TryConsume(2)); - EXPECT_EQ(t->consumption(), 9); - EXPECT_FALSE(t->LimitExceeded()); - - // GcFunction won't be called - EXPECT_TRUE(t->TryConsume(1)); - EXPECT_EQ(t->consumption(), 10); - EXPECT_FALSE(t->LimitExceeded()); - - // Test LimitExceeded() - t->Consume(1); - EXPECT_EQ(t->consumption(), 11); - EXPECT_FALSE(t->LimitExceeded()); - EXPECT_EQ(t->consumption(), 10); - - // Add more GcFunctions, test that we only call them until the limit is no longer - // exceeded - GcFunctionHelper gc_func_helper2(t.get()); - t->AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper2)); - GcFunctionHelper gc_func_helper3(t.get()); - t->AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper3)); - t->Consume(1); - EXPECT_EQ(t->consumption(), 11); - EXPECT_FALSE(t->LimitExceeded()); - EXPECT_EQ(t->consumption(), 10); - t->Release(10); -} - TEST(MemTrackerTest, STLContainerAllocator) { shared_ptr t = MemTracker::CreateTracker(-1, "t"); MemTrackerAllocator vec_alloc(t); @@ -249,68 +196,6 @@ TEST(MemTrackerTest, ScopedTrackedConsumption) { ASSERT_EQ(0, m->consumption()); } -TEST(MemTrackerTest, SoftLimitExceeded) { - const int kNumIters = 100000; - const int kMemLimit = 1000; - google::FlagSaver saver; - FLAGS_memory_limit_soft_percentage = 0; - shared_ptr m = MemTracker::CreateTracker(kMemLimit, "test"); - - // Consumption is 0; the soft limit is never exceeded. - for (int i = 0; i < kNumIters; i++) { - ASSERT_FALSE(m->SoftLimitExceeded(nullptr)); - } - - // Consumption is half of the actual limit, so we expect to exceed the soft - // limit roughly half the time. - ScopedTrackedConsumption consumption(m, kMemLimit / 2); - int exceeded_count = 0; - for (int i = 0; i < kNumIters; i++) { - double current_percentage; - if (m->SoftLimitExceeded(¤t_percentage)) { - exceeded_count++; - ASSERT_NEAR(50, current_percentage, 0.1); - } - } - double exceeded_pct = static_cast(exceeded_count) / kNumIters * 100; - ASSERT_TRUE(exceeded_pct > 47 && exceeded_pct < 52); - - // Consumption is over the limit; the soft limit is always exceeded. - consumption.Reset(kMemLimit + 1); - for (int i = 0; i < kNumIters; i++) { - double current_percentage; - ASSERT_TRUE(m->SoftLimitExceeded(¤t_percentage)); - ASSERT_NEAR(100, current_percentage, 0.1); - } -} - -#ifdef TCMALLOC_ENABLED -TEST(MemTrackerTest, TcMallocRootTracker) { - shared_ptr root = MemTracker::GetRootTracker(); - - // The root tracker's consumption and tcmalloc should agree. - size_t value; - root->UpdateConsumption(); - ASSERT_TRUE(MallocExtension::instance()->GetNumericProperty( - "generic.current_allocated_bytes", &value)); - ASSERT_EQ(value, root->consumption()); - - // Explicit Consume() and Release() have no effect. - root->Consume(100); - ASSERT_EQ(value, root->consumption()); - root->Release(3); - ASSERT_EQ(value, root->consumption()); - - // But if we allocate something really big, we should see a change. - gscoped_ptr big_alloc(new char[4*1024*1024]); - // clang in release mode can optimize out the above allocation unless - // we do something with the pointer... so we just log it. - VLOG(8) << static_cast(big_alloc.get()); - root->UpdateConsumption(); - ASSERT_GT(root->consumption(), value); -} -#endif - TEST(MemTrackerTest, CollisionDetection) { shared_ptr p = MemTracker::CreateTracker(-1, "parent"); shared_ptr c = MemTracker::CreateTracker(-1, "child", p); http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/mem_tracker.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/mem_tracker.cc b/src/kudu/util/mem_tracker.cc index b247cb4..88e1d23 100644 --- a/src/kudu/util/mem_tracker.cc +++ b/src/kudu/util/mem_tracker.cc @@ -23,49 +23,14 @@ #include #include #include -#include - -#include #include "kudu/gutil/map-util.h" #include "kudu/gutil/once.h" -#include "kudu/gutil/strings/join.h" -#include "kudu/gutil/strings/human_readable.h" #include "kudu/gutil/strings/substitute.h" -#include "kudu/util/debug-util.h" -#include "kudu/util/debug/trace_event.h" -#include "kudu/util/env.h" -#include "kudu/util/flag_tags.h" #include "kudu/util/mutex.h" -#include "kudu/util/random_util.h" +#include "kudu/util/process_memory.h" #include "kudu/util/status.h" -DEFINE_int64(memory_limit_hard_bytes, 0, - "Maximum amount of memory this daemon should use, in bytes. " - "A value of 0 autosizes based on the total system memory. " - "A value of -1 disables all memory limiting."); -TAG_FLAG(memory_limit_hard_bytes, stable); - -DEFINE_int32(memory_limit_soft_percentage, 60, - "Percentage of the hard memory limit that this daemon may " - "consume before memory throttling of writes begins. The greater " - "the excess, the higher the chance of throttling. In general, a " - "lower soft limit leads to smoother write latencies but " - "decreased throughput, and vice versa for a higher soft limit."); -TAG_FLAG(memory_limit_soft_percentage, advanced); - -DEFINE_int32(memory_limit_warn_threshold_percentage, 98, - "Percentage of the hard memory limit that this daemon may " - "consume before WARNING level messages are periodically logged."); -TAG_FLAG(memory_limit_warn_threshold_percentage, advanced); - -#ifdef TCMALLOC_ENABLED -DEFINE_int32(tcmalloc_max_free_bytes_percentage, 10, - "Maximum percentage of the RSS that tcmalloc is allowed to use for " - "reserved but unallocated memory."); -TAG_FLAG(tcmalloc_max_free_bytes_percentage, advanced); -#endif - namespace kudu { // NOTE: this class has been adapted from Impala, so the code style varies @@ -73,7 +38,6 @@ namespace kudu { using std::deque; using std::list; -using std::ostringstream; using std::shared_ptr; using std::string; using std::vector; @@ -85,94 +49,29 @@ using strings::Substitute; static shared_ptr root_tracker; static GoogleOnceType root_tracker_once = GOOGLE_ONCE_INIT; -// Total amount of memory from calls to Release() since the last GC. If this -// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc. -static Atomic64 released_memory_since_gc; - -// Validate that various flags are percentages. -static bool ValidatePercentage(const char* flagname, int value) { - if (value >= 0 && value <= 100) { - return true; - } - LOG(ERROR) << Substitute("$0 must be a percentage, value $1 is invalid", - flagname, value); - return false; -} -static bool dummy[] = { - google::RegisterFlagValidator(&FLAGS_memory_limit_soft_percentage, &ValidatePercentage), - google::RegisterFlagValidator(&FLAGS_memory_limit_warn_threshold_percentage, &ValidatePercentage) -#ifdef TCMALLOC_ENABLED - ,google::RegisterFlagValidator(&FLAGS_tcmalloc_max_free_bytes_percentage, &ValidatePercentage) -#endif -}; - -#ifdef TCMALLOC_ENABLED -static int64_t GetTCMallocProperty(const char* prop) { - size_t value; - if (!MallocExtension::instance()->GetNumericProperty(prop, &value)) { - LOG(DFATAL) << "Failed to get tcmalloc property " << prop; - } - return value; -} - -static int64_t GetTCMallocCurrentAllocatedBytes() { - return GetTCMallocProperty("generic.current_allocated_bytes"); -} -#endif - void MemTracker::CreateRootTracker() { - int64_t limit = FLAGS_memory_limit_hard_bytes; - if (limit == 0) { - // If no limit is provided, we'll use 80% of system RAM. - int64_t total_ram; - CHECK_OK(Env::Default()->GetTotalRAMBytes(&total_ram)); - limit = total_ram * 4; - limit /= 5; - } - - ConsumptionFunction f; -#ifdef TCMALLOC_ENABLED - f = &GetTCMallocCurrentAllocatedBytes; -#endif - root_tracker.reset(new MemTracker(f, limit, "root", - shared_ptr())); + root_tracker.reset(new MemTracker(-1, "root", shared_ptr())); root_tracker->Init(); - LOG(INFO) << StringPrintf("MemTracker: hard memory limit is %.6f GB", - (static_cast(limit) / (1024.0 * 1024.0 * 1024.0))); - LOG(INFO) << StringPrintf("MemTracker: soft memory limit is %.6f GB", - (static_cast(root_tracker->soft_limit_) / - (1024.0 * 1024.0 * 1024.0))); } shared_ptr MemTracker::CreateTracker(int64_t byte_limit, const string& id, const shared_ptr& parent) { shared_ptr real_parent = parent ? parent : GetRootTracker(); - shared_ptr tracker( - new MemTracker(ConsumptionFunction(), byte_limit, id, real_parent)); + shared_ptr tracker(new MemTracker(byte_limit, id, real_parent)); real_parent->AddChildTracker(tracker); tracker->Init(); return tracker; } -MemTracker::MemTracker(ConsumptionFunction consumption_func, int64_t byte_limit, - const string& id, shared_ptr parent) +MemTracker::MemTracker(int64_t byte_limit, const string& id, shared_ptr parent) : limit_(byte_limit), id_(id), descr_(Substitute("memory consumption for $0", id)), parent_(std::move(parent)), - consumption_(0), - consumption_func_(std::move(consumption_func)), - rand_(GetRandomSeed32()), - enable_logging_(false), - log_stack_(false) { + consumption_(0) { VLOG(1) << "Creating tracker " << ToString(); - if (consumption_func_) { - UpdateConsumption(); - } - soft_limit_ = (limit_ == -1) - ? -1 : (limit_ * FLAGS_memory_limit_soft_percentage) / 100; } MemTracker::~MemTracker() { @@ -284,67 +183,35 @@ void MemTracker::ListTrackers(vector>* trackers) { } } -void MemTracker::UpdateConsumption() { - DCHECK(!consumption_func_.empty()); - DCHECK(parent_.get() == NULL); - consumption_.set_value(consumption_func_()); -} - void MemTracker::Consume(int64_t bytes) { if (bytes < 0) { Release(-bytes); return; } - if (!consumption_func_.empty()) { - UpdateConsumption(); - return; - } if (bytes == 0) { return; } - if (PREDICT_FALSE(enable_logging_)) { - LogUpdate(true, bytes); - } for (auto& tracker : all_trackers_) { tracker->consumption_.IncrementBy(bytes); - if (!tracker->consumption_func_.empty()) { - DCHECK_GE(tracker->consumption_.current_value(), 0); - } } } bool MemTracker::TryConsume(int64_t bytes) { - if (!consumption_func_.empty()) { - UpdateConsumption(); - } if (bytes <= 0) { + Release(-bytes); return true; } - if (PREDICT_FALSE(enable_logging_)) { - LogUpdate(true, bytes); - } int i = 0; - // Walk the tracker tree top-down, to avoid expanding a limit on a child whose parent - // won't accommodate the change. + // Walk the tracker tree top-down, consuming memory from each in turn. for (i = all_trackers_.size() - 1; i >= 0; --i) { MemTracker *tracker = all_trackers_[i]; if (tracker->limit_ < 0) { tracker->consumption_.IncrementBy(bytes); } else { if (!tracker->consumption_.TryIncrementBy(bytes, tracker->limit_)) { - // One of the trackers failed, attempt to GC memory or expand our limit. If that - // succeeds, TryUpdate() again. Bail if either fails. - if (!tracker->GcMemory(tracker->limit_ - bytes) || - tracker->ExpandLimit(bytes)) { - if (!tracker->consumption_.TryIncrementBy( - bytes, tracker->limit_)) { - break; - } - } else { - break; - } + break; } } } @@ -354,14 +221,10 @@ bool MemTracker::TryConsume(int64_t bytes) { } // Someone failed, roll back the ones that succeeded. - // TODO: this doesn't roll it back completely since the max values for + // TODO(todd): this doesn't roll it back completely since the max values for // the updated trackers aren't decremented. The max values are only used // for error reporting so this is probably okay. Rolling those back is // pretty hard; we'd need something like 2PC. - // - // TODO: This might leave us with an allocated resource that we can't use. Do we need - // to adjust the consumption of the query tracker to stop the resource from never - // getting used by a subsequent TryConsume()? for (int j = all_trackers_.size() - 1; j > i; --j) { all_trackers_[j]->consumption_.IncrementBy(-bytes); } @@ -374,35 +237,14 @@ void MemTracker::Release(int64_t bytes) { return; } - if (PREDICT_FALSE(base::subtle::Barrier_AtomicIncrement(&released_memory_since_gc, bytes) > - GC_RELEASE_SIZE)) { - GcTcmalloc(); - } - - if (!consumption_func_.empty()) { - UpdateConsumption(); - return; - } - if (bytes == 0) { return; } - if (PREDICT_FALSE(enable_logging_)) { - LogUpdate(false, bytes); - } for (auto& tracker : all_trackers_) { tracker->consumption_.IncrementBy(-bytes); - // If a UDF calls FunctionContext::TrackAllocation() but allocates less than the - // reported amount, the subsequent call to FunctionContext::Free() may cause the - // process mem tracker to go negative until it is synced back to the tcmalloc - // metric. Don't blow up in this case. (Note that this doesn't affect non-process - // trackers since we can enforce that the reported memory usage is internally - // consistent.) - if (!tracker->consumption_func_.empty()) { - DCHECK_GE(tracker->consumption_.current_value(), 0); - } } + process_memory::MaybeGCAfterRelease(bytes); } bool MemTracker::AnyLimitExceeded() { @@ -414,55 +256,6 @@ bool MemTracker::AnyLimitExceeded() { return false; } -bool MemTracker::LimitExceeded() { - if (PREDICT_FALSE(CheckLimitExceeded())) { - return GcMemory(limit_); - } - return false; -} - -bool MemTracker::SoftLimitExceeded(double* current_capacity_pct) { - // Did we exceed the actual limit? - if (LimitExceeded()) { - if (current_capacity_pct) { - *current_capacity_pct = - static_cast(consumption()) / limit() * 100; - } - return true; - } - - // No soft limit defined. - if (!has_limit() || limit_ == soft_limit_) { - return false; - } - - // Are we under the soft limit threshold? - int64_t usage = consumption(); - if (usage < soft_limit_) { - return false; - } - - // We're over the threshold; were we randomly chosen to be over the soft limit? - if (usage + rand_.Uniform64(limit_ - soft_limit_) > limit_) { - bool exceeded = GcMemory(soft_limit_); - if (exceeded && current_capacity_pct) { - *current_capacity_pct = - static_cast(consumption()) / limit() * 100; - } - return exceeded; - } - return false; -} - -bool MemTracker::AnySoftLimitExceeded(double* current_capacity_pct) { - for (MemTracker* t : limit_trackers_) { - if (t->SoftLimitExceeded(current_capacity_pct)) { - return true; - } - } - return false; -} - int64_t MemTracker::SpareCapacity() const { int64_t result = std::numeric_limits::max(); for (const auto& tracker : limit_trackers_) { @@ -472,84 +265,6 @@ int64_t MemTracker::SpareCapacity() const { return result; } -bool MemTracker::GcMemory(int64_t max_consumption) { - if (max_consumption < 0) { - // Impossible to GC enough memory to reach the goal. - return true; - } - - std::lock_guard l(gc_lock_); - if (!consumption_func_.empty()) { - UpdateConsumption(); - } - uint64_t pre_gc_consumption = consumption(); - // Check if someone gc'd before us - if (pre_gc_consumption < max_consumption) { - return false; - } - - // Try to free up some memory - for (const auto& gc_function : gc_functions_) { - gc_function(); - if (!consumption_func_.empty()) { - UpdateConsumption(); - } - if (consumption() <= max_consumption) { - break; - } - } - - return consumption() > max_consumption; -} - -void MemTracker::GcTcmalloc() { -#ifdef TCMALLOC_ENABLED - released_memory_since_gc = 0; - TRACE_EVENT0("process", "MemTracker::GcTcmalloc"); - - // Number of bytes in the 'NORMAL' free list (i.e reserved by tcmalloc but - // not in use). - int64_t bytes_overhead = GetTCMallocProperty("tcmalloc.pageheap_free_bytes"); - // Bytes allocated by the application. - int64_t bytes_used = GetTCMallocCurrentAllocatedBytes(); - - int64_t max_overhead = bytes_used * FLAGS_tcmalloc_max_free_bytes_percentage / 100.0; - if (bytes_overhead > max_overhead) { - int64_t extra = bytes_overhead - max_overhead; - while (extra > 0) { - // Release 1MB at a time, so that tcmalloc releases its page heap lock - // allowing other threads to make progress. This still disrupts the current - // thread, but is better than disrupting all. - MallocExtension::instance()->ReleaseToSystem(1024 * 1024); - extra -= 1024 * 1024; - } - } - -#else - // Nothing to do if not using tcmalloc. -#endif -} - -string MemTracker::LogUsage(const string& prefix) const { - ostringstream ss; - ss << prefix << id_ << ":"; - if (CheckLimitExceeded()) { - ss << " memory limit exceeded."; - } - if (limit_ > 0) { - ss << " Limit=" << HumanReadableNumBytes::ToString(limit_); - } - ss << " Consumption=" << HumanReadableNumBytes::ToString(consumption()); - - ostringstream prefix_ss; - prefix_ss << prefix << " "; - string new_prefix = prefix_ss.str(); - MutexLock l(child_trackers_lock_); - if (!child_trackers_.empty()) { - ss << "\n" << LogUsage(new_prefix, child_trackers_); - } - return ss.str(); -} void MemTracker::Init() { // populate all_trackers_ and limit_trackers_ @@ -568,28 +283,6 @@ void MemTracker::AddChildTracker(const shared_ptr& tracker) { tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), tracker); } -void MemTracker::LogUpdate(bool is_consume, int64_t bytes) const { - ostringstream ss; - ss << this << " " << (is_consume ? "Consume: " : "Release: ") << bytes - << " Consumption: " << consumption() << " Limit: " << limit_; - if (log_stack_) { - ss << std::endl << GetStackTrace(); - } - LOG(ERROR) << ss.str(); -} - -string MemTracker::LogUsage(const string& prefix, - const list>& trackers) { - vector usage_strings; - for (const auto& child_weak : trackers) { - shared_ptr child = child_weak.lock(); - if (child) { - usage_strings.push_back(child->LogUsage(prefix)); - } - } - return JoinStrings(usage_strings, "\n"); -} - shared_ptr MemTracker::GetRootTracker() { GoogleOnceInit(&root_tracker_once, &MemTracker::CreateRootTracker); return root_tracker; http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/mem_tracker.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/mem_tracker.h b/src/kudu/util/mem_tracker.h index cd00e22..a43e9a2 100644 --- a/src/kudu/util/mem_tracker.h +++ b/src/kudu/util/mem_tracker.h @@ -17,7 +17,6 @@ #ifndef KUDU_UTIL_MEM_TRACKER_H #define KUDU_UTIL_MEM_TRACKER_H -#include #include #include #include @@ -28,7 +27,6 @@ #include "kudu/util/high_water_mark.h" #include "kudu/util/locks.h" #include "kudu/util/mutex.h" -#include "kudu/util/random.h" namespace kudu { @@ -39,8 +37,8 @@ class MemTracker; // arranged into a tree structure such that the consumption tracked by a // MemTracker is also tracked by its ancestors. // -// The MemTracker hierarchy is rooted in a single static MemTracker whose limit -// is set via gflag. The root MemTracker always exists, and it is the common +// The MemTracker hierarchy is rooted in a single static MemTracker. +// The root MemTracker always exists, and it is the common // ancestor to all MemTrackers. All operations that discover MemTrackers begin // at the root and work their way down the tree, while operations that deal // with adjusting memory consumption begin at a particular MemTracker and work @@ -55,51 +53,21 @@ class MemTracker; // and the parent has a weak reference to it. Both remain for the lifetime of // the MemTracker. // -// By default, memory consumption is tracked via calls to Consume()/Release(), either to -// the tracker itself or to one of its descendents. Alternatively, a consumption function -// can specified, and then the function's value is used as the consumption rather than the -// tally maintained by Consume() and Release(). A tcmalloc function is used to track process -// memory consumption, since the process memory usage may be higher than the computed -// total memory (tcmalloc does not release deallocated memory immediately). -// -// GcFunctions can be attached to a MemTracker in order to free up memory if the limit is -// reached. If LimitExceeded() is called and the limit is exceeded, it will first call the -// GcFunctions to try to free memory and recheck the limit. For example, the process -// tracker has a GcFunction that releases any unused memory still held by tcmalloc, so -// this will be called before the process limit is reported as exceeded. GcFunctions are -// called in the order they are added, so expensive functions should be added last. +// Memory consumption is tracked via calls to Consume()/Release(), either to +// the tracker itself or to one of its descendants. // // This class is thread-safe. -// -// NOTE: this class has been partially ported over from Impala with -// several changes, and as a result the style differs somewhat from -// the Kudu style. -// -// Changes from Impala: -// 1) Id a string vs. a TUniqueId -// 2) There is no concept of query trackers vs. pool trackers -- trackers are instead -// associated with objects. Parent hierarchy is preserved, with the assumption that, -// e.g., a tablet server's memtracker will have as its children the tablets' memtrackers, -// which in turn will have memtrackers for their caches, logs, and so forth. -// -// TODO: this classes uses a lot of statics fields and methods, which -// isn't common in Kudu. It is probably wise to later move the -// 'registry' of trackers to a separate class, but it's better to -// start using the 'class' *first* and then change this functionality, -// depending on how MemTracker ends up being used in Kudu. class MemTracker : public std::enable_shared_from_this { public: - - // Signature for function that can be called to free some memory after limit is reached. - typedef boost::function GcFunction; - ~MemTracker(); // Creates and adds the tracker to the tree so that it can be retrieved with // FindTracker/FindOrCreateTracker. // - // byte_limit < 0 means no limit; 'id' is a used as a label for LogUsage() - // and web UI. Use the two-argument form if there is no parent. + // byte_limit < 0 means no limit; 'id' is a used as a label to uniquely identify + // the MemTracker for the below Find...() calls as well as the web UI. + // + // Use the two-argument form if there is no parent. static std::shared_ptr CreateTracker( int64_t byte_limit, const std::string& id, @@ -133,19 +101,9 @@ class MemTracker : public std::enable_shared_from_this { // Gets a shared_ptr to the "root" tracker, creating it if necessary. static std::shared_ptr GetRootTracker(); - // Updates consumption from the consumption function specified in the constructor. - // NOTE: this method will crash if 'consumption_func_' is not set. - void UpdateConsumption(); - // Increases consumption of this tracker and its ancestors by 'bytes'. void Consume(int64_t bytes); - // Try to expand the limit (by asking the resource broker for more memory) by at least - // 'bytes'. Returns false if not possible, true if the request succeeded. May allocate - // more memory than was requested. - // TODO: always returns false for now, not yet implemented. - bool ExpandLimit(int64_t /* unused: bytes */) { return false; } - // Increases consumption of this tracker and its ancestors by 'bytes' only if // they can all consume 'bytes'. If this brings any of them over, none of them // are updated. @@ -153,6 +111,9 @@ class MemTracker : public std::enable_shared_from_this { bool TryConsume(int64_t bytes); // Decreases consumption of this tracker and its ancestors by 'bytes'. + // + // This will also cause the process to periodically trigger tcmalloc "ReleaseMemory" + // to ensure that memory is released to the OS. void Release(int64_t bytes); // Returns true if a valid limit of this tracker or one of its ancestors is @@ -162,21 +123,9 @@ class MemTracker : public std::enable_shared_from_this { // If this tracker has a limit, checks the limit and attempts to free up some memory if // the limit is exceeded by calling any added GC functions. Returns true if the limit is // exceeded after calling the GC functions. Returns false if there is no limit. - bool LimitExceeded(); - - // Like LimitExceeded() but may also return true if the soft memory limit is exceeded. - // The greater the excess, the higher the chance that it returns true. - // - // If the soft limit is exceeded and 'current_capacity_pct' is not NULL, the percentage - // of the hard limit consumed is written to it. - bool SoftLimitExceeded(double* current_capacity_pct); - - // Combines the semantics of AnyLimitExceeded() and SoftLimitExceeded(). - // - // Note: if there's more than one soft limit defined, the probability of it being - // exceeded in at least one tracker is much higher (as each soft limit check is an - // independent event). - bool AnySoftLimitExceeded(double* current_capacity_pct); + bool LimitExceeded() { + return limit_ >= 0 && limit_ < consumption(); + } // Returns the maximum consumption that can be made without exceeding the limit on // this tracker or any of its parents. Returns int64_t::max() if there are no @@ -193,62 +142,19 @@ class MemTracker : public std::enable_shared_from_this { return consumption_.current_value(); } - // Note that if consumption_ is based on consumption_func_, this - // will be the max value we've recorded in consumption(), not - // necessarily the highest value consumption_func_ has ever - // reached. int64_t peak_consumption() const { return consumption_.max_value(); } // Retrieve the parent tracker, or NULL If one is not set. std::shared_ptr parent() const { return parent_; } - // Add a function 'f' to be called if the limit is reached. - // 'f' does not need to be thread-safe as long as it is added to only one MemTracker. - // Note that 'f' must be valid for the lifetime of this MemTracker. - void AddGcFunction(GcFunction f) { - gc_functions_.push_back(f); - } - - // Logs the usage of this tracker and all of its children (recursively). - std::string LogUsage(const std::string& prefix = "") const; - - void EnableLogging(bool enable, bool log_stack) { - enable_logging_ = enable; - log_stack_ = log_stack; - } - // Returns a textual representation of the tracker that is likely (but not // guaranteed) to be globally unique. std::string ToString() const; private: - // Function signatures for gauge-style memory trackers (where consumption is - // periodically observed rather than explicitly tracked). - // - // Currently only used by the root tracker. - typedef boost::function ConsumptionFunction; - - // If consumption_func is not empty, uses it as the consumption value. - // Consume()/Release() can still be called. // byte_limit < 0 means no limit // 'id' is the label for LogUsage() and web UI. - MemTracker(ConsumptionFunction consumption_func, int64_t byte_limit, - const std::string& id, std::shared_ptr parent); - - bool CheckLimitExceeded() const { - return limit_ >= 0 && limit_ < consumption(); - } - - // If consumption is higher than max_consumption, attempts to free memory by calling any - // added GC functions. Returns true if max_consumption is still exceeded. Takes - // gc_lock. Updates metrics if initialized. - bool GcMemory(int64_t max_consumption); - - // Called when the total release memory is larger than GC_RELEASE_SIZE. - // TcMalloc holds onto released memory and very slowly (if ever) releases it back to - // the OS. This is problematic since it is memory we are not constantly tracking which - // can cause us to go way over mem limits. - void GcTcmalloc(); + MemTracker(int64_t byte_limit, const std::string& id, std::shared_ptr parent); // Further initializes the tracker. void Init(); @@ -256,12 +162,6 @@ class MemTracker : public std::enable_shared_from_this { // Adds tracker to child_trackers_. void AddChildTracker(const std::shared_ptr& tracker); - // Logs the stack of the current consume/release. Used for debugging only. - void LogUpdate(bool is_consume, int64_t bytes) const; - - static std::string LogUsage(const std::string& prefix, - const std::list>& trackers); - // Variant of FindTracker() that must be called with a non-NULL parent. static bool FindTrackerInternal( const std::string& id, @@ -271,25 +171,13 @@ class MemTracker : public std::enable_shared_from_this { // Creates the root tracker. static void CreateRootTracker(); - // Size, in bytes, that is considered a large value for Release() (or Consume() with - // a negative value). If tcmalloc is used, this can trigger it to GC. - // A higher value will make us call into tcmalloc less often (and therefore more - // efficient). A lower value will mean our memory overhead is lower. - // TODO: this is a stopgap. - static const int64_t GC_RELEASE_SIZE = 128 * 1024L * 1024L; - - simple_spinlock gc_lock_; - int64_t limit_; - int64_t soft_limit_; const std::string id_; const std::string descr_; std::shared_ptr parent_; HighWaterMark consumption_; - ConsumptionFunction consumption_func_; - // this tracker plus all of its ancestors std::vector all_trackers_; // all_trackers_ with valid limits @@ -304,17 +192,6 @@ class MemTracker : public std::enable_shared_from_this { // Iterator into parent_->child_trackers_ for this object. Stored to have O(1) // remove. std::list>::iterator child_tracker_it_; - - // Functions to call after the limit is reached to free memory. - std::vector gc_functions_; - - ThreadSafeRandom rand_; - - // If true, logs to INFO every consume/release called. Used for debugging. - bool enable_logging_; - - // If true, log the stack as well. - bool log_stack_; }; // An std::allocator that manipulates a MemTracker during allocation http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/process_memory.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/process_memory.cc b/src/kudu/util/process_memory.cc new file mode 100644 index 0000000..d3868c2 --- /dev/null +++ b/src/kudu/util/process_memory.cc @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/env.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/mem_tracker.h" +#include "kudu/util/process_memory.h" +#include "kudu/util/random.h" + +DEFINE_int64(memory_limit_hard_bytes, 0, + "Maximum amount of memory this daemon should use, in bytes. " + "A value of 0 autosizes based on the total system memory. " + "A value of -1 disables all memory limiting."); +TAG_FLAG(memory_limit_hard_bytes, stable); + +DEFINE_int32(memory_limit_soft_percentage, 60, + "Percentage of the hard memory limit that this daemon may " + "consume before memory throttling of writes begins. The greater " + "the excess, the higher the chance of throttling. In general, a " + "lower soft limit leads to smoother write latencies but " + "decreased throughput, and vice versa for a higher soft limit."); +TAG_FLAG(memory_limit_soft_percentage, advanced); + +DEFINE_int32(memory_limit_warn_threshold_percentage, 98, + "Percentage of the hard memory limit that this daemon may " + "consume before WARNING level messages are periodically logged."); +TAG_FLAG(memory_limit_warn_threshold_percentage, advanced); + +#ifdef TCMALLOC_ENABLED +DEFINE_int32(tcmalloc_max_free_bytes_percentage, 10, + "Maximum percentage of the RSS that tcmalloc is allowed to use for " + "reserved but unallocated memory."); +TAG_FLAG(tcmalloc_max_free_bytes_percentage, advanced); +#endif + +using strings::Substitute; + +namespace kudu { +namespace process_memory { + +namespace { +int64_t g_hard_limit; +int64_t g_soft_limit; + +ThreadSafeRandom* g_rand = nullptr; + +#ifdef TCMALLOC_ENABLED +// Total amount of memory released since the last GC. If this +// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc. +Atomic64 g_released_memory_since_gc; + +// Size, in bytes, that is considered a large value for Release() (or Consume() with +// a negative value). If tcmalloc is used, this can trigger it to GC. +// A higher value will make us call into tcmalloc less often (and therefore more +// efficient). A lower value will mean our memory overhead is lower. +// TODO(todd): this is a stopgap. +const int64_t GC_RELEASE_SIZE = 128 * 1024L * 1024L; + +#endif // TCMALLOC_ENABLED + +} // anonymous namespace + + +// Flag validation +// ------------------------------------------------------------ +// Validate that various flags are percentages. +static bool ValidatePercentage(const char* flagname, int value) { + if (value >= 0 && value <= 100) { + return true; + } + LOG(ERROR) << Substitute("$0 must be a percentage, value $1 is invalid", + flagname, value); + return false; +} + +static bool dummy[] = { + google::RegisterFlagValidator(&FLAGS_memory_limit_soft_percentage, &ValidatePercentage), + google::RegisterFlagValidator(&FLAGS_memory_limit_warn_threshold_percentage, &ValidatePercentage) +#ifdef TCMALLOC_ENABLED + ,google::RegisterFlagValidator(&FLAGS_tcmalloc_max_free_bytes_percentage, &ValidatePercentage) +#endif +}; + + +// Wrappers around tcmalloc functionality +// ------------------------------------------------------------ +#ifdef TCMALLOC_ENABLED +static int64_t GetTCMallocProperty(const char* prop) { + size_t value; + if (!MallocExtension::instance()->GetNumericProperty(prop, &value)) { + LOG(DFATAL) << "Failed to get tcmalloc property " << prop; + } + return value; +} + +static int64_t GetTCMallocCurrentAllocatedBytes() { + return GetTCMallocProperty("generic.current_allocated_bytes"); +} + +void GcTcmalloc() { + TRACE_EVENT0("process", "GcTcmalloc"); + + // Number of bytes in the 'NORMAL' free list (i.e reserved by tcmalloc but + // not in use). + int64_t bytes_overhead = GetTCMallocProperty("tcmalloc.pageheap_free_bytes"); + // Bytes allocated by the application. + int64_t bytes_used = GetTCMallocCurrentAllocatedBytes(); + + int64_t max_overhead = bytes_used * FLAGS_tcmalloc_max_free_bytes_percentage / 100.0; + if (bytes_overhead > max_overhead) { + int64_t extra = bytes_overhead - max_overhead; + while (extra > 0) { + // Release 1MB at a time, so that tcmalloc releases its page heap lock + // allowing other threads to make progress. This still disrupts the current + // thread, but is better than disrupting all. + MallocExtension::instance()->ReleaseToSystem(1024 * 1024); + extra -= 1024 * 1024; + } + } +} +#endif // TCMALLOC_ENABLED + + +// Consumption and soft memory limit behavior +// ------------------------------------------------------------ +namespace { +void InitLimits() { + static std::once_flag once; + std::call_once(once, [&]() { + int64_t limit = FLAGS_memory_limit_hard_bytes; + if (limit == 0) { + // If no limit is provided, we'll use 80% of system RAM. + int64_t total_ram; + CHECK_OK(Env::Default()->GetTotalRAMBytes(&total_ram)); + limit = total_ram * 4; + limit /= 5; + } + g_hard_limit = limit; + g_soft_limit = FLAGS_memory_limit_soft_percentage * g_hard_limit / 100; + + g_rand = new ThreadSafeRandom(1); + + LOG(INFO) << StringPrintf("Process hard memory limit is %.6f GB", + (static_cast(g_hard_limit) / (1024.0 * 1024.0 * 1024.0))); + LOG(INFO) << StringPrintf("Process soft memory limit is %.6f GB", + (static_cast(g_soft_limit) / + (1024.0 * 1024.0 * 1024.0))); + }); +} +} // anonymous namespace + +int64_t CurrentConsumption() { + // TODO(todd): this is slow to call frequently, since it takes the tcmalloc + // global lock. We should look into whether it's faster to hook malloc/free + // and update a LongAdder instead, or otherwise make this more incrementally + // tracked. +#ifdef TCMALLOC_ENABLED + return GetTCMallocCurrentAllocatedBytes(); +#else + // Without tcmalloc, we have no reliable way of determining our own heap + // size (e.g. mallinfo doesn't work in ASAN builds). So, we'll fall back + // to just looking at the sum of our tracked memory. + return MemTracker::GetRootTracker()->consumption(); +#endif +} + +int64_t HardLimit() { + return g_hard_limit; +} + +bool SoftLimitExceeded(double* current_capacity_pct) { + InitLimits(); + int64_t consumption = CurrentConsumption(); + // Did we exceed the actual limit? + if (consumption > g_hard_limit) { + if (current_capacity_pct) { + *current_capacity_pct = static_cast(consumption) / g_hard_limit * 100; + } + return true; + } + + // No soft limit defined. + if (g_hard_limit == g_soft_limit) { + return false; + } + + // Are we under the soft limit threshold? + if (consumption < g_soft_limit) { + return false; + } + + // We're over the threshold; were we randomly chosen to be over the soft limit? + if (consumption + g_rand->Uniform64(g_hard_limit - g_soft_limit) > g_hard_limit) { + if (current_capacity_pct) { + *current_capacity_pct = static_cast(consumption) / g_hard_limit * 100; + } + return true; + } + return false; +} + +void MaybeGCAfterRelease(int64_t released_bytes) { +#ifdef TCMALLOC_ENABLED + int64_t now_released = base::subtle::NoBarrier_AtomicIncrement( + &g_released_memory_since_gc, -released_bytes); + if (PREDICT_FALSE(now_released > GC_RELEASE_SIZE)) { + base::subtle::NoBarrier_Store(&g_released_memory_since_gc, 0); + GcTcmalloc(); + } +#endif +} + +} // namespace process_memory +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/process_memory.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/process_memory.h b/src/kudu/util/process_memory.h new file mode 100644 index 0000000..2868bf5 --- /dev/null +++ b/src/kudu/util/process_memory.h @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +namespace kudu { +namespace process_memory { + +// Probabilistically returns true if the process-wide soft memory limit is exceeded. +// The greater the excess, the higher the chance that it returns true. +// +// If the soft limit is exceeded and 'current_capacity_pct' is not NULL, the percentage +// of the hard limit consumed is written to it. +bool SoftLimitExceeded(double* current_capacity_pct); + +// Potentially trigger a call to release tcmalloc memory back to the +// OS, after the given amount of memory was released. +void MaybeGCAfterRelease(int64_t released_bytes); + +// Return the total current memory consumption of the process. +int64_t CurrentConsumption(); + +// Return the configured hard limit for the process. +int64_t HardLimit(); + +} // namespace process_memory +} // namespace kudu
IdParentLimitCurrent Consumption