kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject [2/3] kudu git commit: Simplify MemTracker and move process throttling elsewhere
Date Thu, 27 Apr 2017 03:18:22 GMT
Simplify MemTracker and move process throttling elsewhere

This takes a first step towards simplifying MemTracker:

- Remove the "GC function" callbacks (we never used this)

- Remove the 'ExpandLimits' code which was unimplemented.

- Remove the logging functionality, which we've never used
  as far as I can remember.

- Remove soft limiting. We only used this on the root tracker, so
  I just moved it to a new process_memory.{h,cc}

- Remove 'consumption_func' and un-tie the root tracker from
  the global process memory usage. Now the root tracker is a simple
  sum of its descendents.

For a stress/benchmark I ran a 500GB YCSB with a memory limit set to
10GB. Results showed no major difference with this patch (throughput was
a few percent faster but within the realm of noise). Details at [1]

[1] https://docs.google.com/document/d/1dOe5-L5BWUhF-uV4-AE5hduvfUWctXizlaoSLlp38yM/edit?usp=sharing

Change-Id: Id16bad7d9a29a83e820a38e9d703811391cffe90
Reviewed-on: http://gerrit.cloudera.org:8080/6620
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2be32d54
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2be32d54
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2be32d54

Branch: refs/heads/master
Commit: 2be32d5441827e0914ad9b5008545003bfda4575
Parents: f03d47e
Author: Todd Lipcon <todd@apache.org>
Authored: Wed Apr 12 17:06:38 2017 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Wed Apr 26 21:25:47 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/raft_consensus.cc            |   3 +-
 .../integration-tests/raft_consensus-itest.cc   |   8 +-
 src/kudu/server/default-path-handlers.cc        |  24 +-
 src/kudu/tablet/multi_column_writer.cc          |   2 +-
 src/kudu/tserver/tablet_service.cc              |   3 +-
 src/kudu/util/CMakeLists.txt                    |   1 +
 src/kudu/util/maintenance_manager-test.cc       |  51 ++-
 src/kudu/util/maintenance_manager.cc            |  10 +-
 src/kudu/util/maintenance_manager.h             |  13 +-
 src/kudu/util/mem_tracker-test.cc               | 115 -------
 src/kudu/util/mem_tracker.cc                    | 327 +------------------
 src/kudu/util/mem_tracker.h                     | 153 +--------
 src/kudu/util/process_memory.cc                 | 235 +++++++++++++
 src/kudu/util/process_memory.h                  |  41 +++
 14 files changed, 373 insertions(+), 613 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 6a87719..f75057f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -43,6 +43,7 @@
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/process_memory.h"
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
 #include "kudu/util/threadpool.h"
@@ -1188,7 +1189,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       // This request contains at least one message, and is likely to increase
       // our memory pressure.
       double capacity_pct;
-      if (parent_mem_tracker_->AnySoftLimitExceeded(&capacity_pct)) {
+      if (process_memory::SoftLimitExceeded(&capacity_pct)) {
         follower_memory_pressure_rejections_->Increment();
         string msg = StringPrintf(
             "Soft memory limit exceeded (at %.2f%% of capacity)",

http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 579e3f3..0f15d84 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -2251,8 +2251,14 @@ TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) {
   master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
 
   // Very low memory limit to ease testing.
+  // When using tcmalloc, we set it to 30MB, since we can get accurate process memory
+  // usage statistics. Otherwise, set to only 4MB, since we'll only be throttling based
+  // on our tracked memory.
+#ifdef TCMALLOC_ENABLED
+  ts_flags.push_back("--memory_limit_hard_bytes=30000000");
+#else
   ts_flags.push_back("--memory_limit_hard_bytes=4194304");
-
+#endif
   // Don't let transaction memory tracking get in the way.
   ts_flags.push_back("--tablet_transaction_memory_limit_mb=-1");
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/server/default-path-handlers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/default-path-handlers.cc b/src/kudu/server/default-path-handlers.cc
index 4182c39..f4f9dd5 100644
--- a/src/kudu/server/default-path-handlers.cc
+++ b/src/kudu/server/default-path-handlers.cc
@@ -38,12 +38,12 @@
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/flags.h"
 #include "kudu/util/histogram.pb.h"
+#include "kudu/util/jsonwriter.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
-#include "kudu/util/jsonwriter.h"
+#include "kudu/util/process_memory.h"
 
-using boost::replace_all;
 using std::ifstream;
 using std::string;
 using std::endl;
@@ -138,13 +138,29 @@ static void MemUsageHandler(const Webserver::WebRequest& req, std::ostringstream
   MallocExtension::instance()->GetStats(buf, 2048);
   // Replace new lines with <br> for html
   string tmp(buf);
-  replace_all(tmp, "\n", tags.line_break);
+  boost::replace_all(tmp, "\n", tags.line_break);
   (*output) << tmp << tags.end_pre_tag;
 #endif
 }
 
 // Registered to handle "/mem-trackers", and prints out to handle memory tracker information.
-static void MemTrackersHandler(const Webserver::WebRequest& req, std::ostringstream* output) {
+static void MemTrackersHandler(const Webserver::WebRequest& /*req*/, std::ostringstream* output) {
+  *output << "<h1>Process memory usage</h1>\n";
+  *output << "<table class='table table-striped'>\n";
+  *output << Substitute("  <tr><th>Total consumption</th><td>$0</td></tr>\n",
+                        HumanReadableNumBytes::ToString(process_memory::CurrentConsumption()));
+  *output << Substitute("  <tr><th>Memory limit</th><td>$0</td></tr>\n",
+                        HumanReadableNumBytes::ToString(process_memory::HardLimit()));
+  *output << "</table>\n";
+#ifndef TCMALLOC_ENABLED
+  *output << R"(
+      <div class="alert alert-warning">
+        <strong>NOTE:</strong> This build of Kudu has not enabled tcmalloc.
+        The above process memory stats will be inaccurate.
+      </div>
+               )";
+#endif
+
   *output << "<h1>Memory usage by subsystem</h1>\n";
   *output << "<table class='table table-striped'>\n";
   *output << "  <tr><th>Id</th><th>Parent</th><th>Limit</th><th>Current Consumption</th>"

http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/tablet/multi_column_writer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/multi_column_writer.cc b/src/kudu/tablet/multi_column_writer.cc
index 51d6c29..7b19df8 100644
--- a/src/kudu/tablet/multi_column_writer.cc
+++ b/src/kudu/tablet/multi_column_writer.cc
@@ -83,10 +83,10 @@ Status MultiColumnWriter::Open() {
     RETURN_NOT_OK_PREPEND(writer->Start(),
                           "Unable to Start() writer for column " + col.ToString());
 
-    LOG(INFO) << "Opened CFile writer for column " << col.ToString();
     cfile_writers_.push_back(writer.release());
     block_ids_.push_back(block_id);
   }
+  LOG(INFO) << "Opened CFile writers for " << cfile_writers_.size() << " column(s)";
 
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 91b288c..3fa9abb 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -57,6 +57,7 @@
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/process_memory.h"
 #include "kudu/util/status.h"
 #include "kudu/util/status_callback.h"
 #include "kudu/util/trace.h"
@@ -747,7 +748,7 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
   // Check for memory pressure; don't bother doing any additional work if we've
   // exceeded the limit.
   double capacity_pct;
-  if (tablet->mem_tracker()->AnySoftLimitExceeded(&capacity_pct)) {
+  if (process_memory::SoftLimitExceeded(&capacity_pct)) {
     tablet->metrics()->leader_memory_pressure_rejections->Increment();
     string msg = StringPrintf(
         "Soft memory limit exceeded (at %.2f%% of capacity)",

http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 7d049a9..04fe5e8 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -169,6 +169,7 @@ set(UTIL_SRCS
   path_util.cc
   pb_util.cc
   pb_util-internal.cc
+  process_memory.cc
   random_util.cc
   resettable_heartbeater.cc
   rolling_log.cc

http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/maintenance_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc
index e5dcc96..98dc9ba 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -15,14 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <gflags/gflags.h>
-#include <gtest/gtest.h>
+#include <atomic>
 #include <memory>
 #include <mutex>
 #include <vector>
 
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/tablet.pb.h"
 #include "kudu/util/maintenance_manager.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
@@ -53,13 +54,15 @@ static const char kFakeUuid[] = "12345";
 class MaintenanceManagerTest : public KuduTest {
  public:
   void SetUp() override {
-    test_tracker_ = MemTracker::CreateTracker(1000, "test");
     MaintenanceManager::Options options;
     options.num_threads = 2;
     options.polling_interval_ms = 1;
     options.history_size = kHistorySize;
-    options.parent_mem_tracker = test_tracker_;
     manager_.reset(new MaintenanceManager(options));
+    manager_->set_memory_pressure_func_for_tests(
+        [&](double* consumption) {
+          return indicate_memory_pressure_.load();
+        });
     ASSERT_OK(manager_->Init(kFakeUuid));
   }
 
@@ -68,8 +71,8 @@ class MaintenanceManagerTest : public KuduTest {
   }
 
  protected:
-  shared_ptr<MemTracker> test_tracker_;
   shared_ptr<MaintenanceManager> manager_;
+  std::atomic<bool> indicate_memory_pressure_ { false };
 };
 
 // Just create the MaintenanceManager and then shut it down, to make sure
@@ -80,10 +83,9 @@ TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
 class TestMaintenanceOp : public MaintenanceOp {
  public:
   TestMaintenanceOp(const std::string& name,
-                    IOUsage io_usage,
-                    const shared_ptr<MemTracker>& tracker)
+                    IOUsage io_usage)
     : MaintenanceOp(name, io_usage),
-      consumption_(tracker, 500),
+      ram_anchored_(500),
       logs_retained_bytes_(0),
       perf_improvement_(0),
       metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_, "test")),
@@ -124,7 +126,7 @@ class TestMaintenanceOp : public MaintenanceOp {
   virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE {
     std::lock_guard<Mutex> guard(lock_);
     stats->set_runnable(remaining_runs_ > 0);
-    stats->set_ram_anchored(consumption_.consumption());
+    stats->set_ram_anchored(ram_anchored_);
     stats->set_logs_retained_bytes(logs_retained_bytes_);
     stats->set_perf_improvement(perf_improvement_);
   }
@@ -141,7 +143,7 @@ class TestMaintenanceOp : public MaintenanceOp {
 
   void set_ram_anchored(uint64_t ram_anchored) {
     std::lock_guard<Mutex> guard(lock_);
-    consumption_.Reset(ram_anchored);
+    ram_anchored_ = ram_anchored;
   }
 
   void set_logs_retained_bytes(uint64_t logs_retained_bytes) {
@@ -165,7 +167,7 @@ class TestMaintenanceOp : public MaintenanceOp {
  private:
   Mutex lock_;
 
-  ScopedTrackedConsumption consumption_;
+  uint64_t ram_anchored_;
   uint64_t logs_retained_bytes_;
   uint64_t perf_improvement_;
   MetricRegistry metric_registry_;
@@ -187,8 +189,8 @@ class TestMaintenanceOp : public MaintenanceOp {
 // running and verify that UnregisterOp waits for it to finish before
 // proceeding.
 TEST_F(MaintenanceManagerTest, TestRegisterUnregister) {
-  TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
-  op1.set_ram_anchored(1001);
+  TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE);
+  op1.set_perf_improvement(10);
   // Register initially with no remaining runs. We'll later enable it once it's
   // already registered.
   op1.set_remaining_runs(0);
@@ -207,8 +209,8 @@ TEST_F(MaintenanceManagerTest, TestRegisterUnregister) {
 // Regression test for KUDU-1495: when an operation is being unregistered,
 // new instances of that operation should not be scheduled.
 TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) {
-  TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
-  op1.set_ram_anchored(1001);
+  TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE);
+  op1.set_perf_improvement(10);
 
   // Set the op to run up to 10 times, and each time should sleep for a second.
   op1.set_remaining_runs(10);
@@ -231,7 +233,7 @@ TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) {
 // Test that we'll run an operation that doesn't improve performance when memory
 // pressure gets high.
 TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
-  TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
+  TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE);
   op.set_ram_anchored(100);
   manager_->RegisterOp(&op);
 
@@ -239,16 +241,13 @@ TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
   SleepFor(MonoDelta::FromMilliseconds(20));
   ASSERT_EQ(0, op.DurationHistogram()->TotalCount());
 
-  // set the ram_anchored by the high mem op so high that we'll have to run it.
-  scoped_refptr<kudu::Thread> thread;
-  ASSERT_OK(Thread::Create("TestThread", "MaintenanceManagerTest",
-      boost::bind(&TestMaintenanceOp::set_ram_anchored, &op, 1100), &thread));
+  // Fake that the server is under memory pressure.
+  indicate_memory_pressure_ = true;
 
   AssertEventually([&]() {
       ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1);
     });
   manager_->UnregisterOp(&op);
-  ThreadJoiner(thread.get()).Join();
 }
 
 // Test that ops are prioritized correctly when we add log retention.
@@ -257,15 +256,15 @@ TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
 
   manager_->Shutdown();
 
-  TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE, test_tracker_);
+  TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE);
   op1.set_ram_anchored(0);
   op1.set_logs_retained_bytes(100 * kMB);
 
-  TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
+  TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE);
   op2.set_ram_anchored(100);
   op2.set_logs_retained_bytes(100 * kMB);
 
-  TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
+  TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE);
   op3.set_ram_anchored(200);
   op3.set_logs_retained_bytes(100 * kMB);
 
@@ -299,7 +298,7 @@ TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
 TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
   for (int i = 0; i < 5; i++) {
     string name = Substitute("op$0", i);
-    TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE, test_tracker_);
+    TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE);
     op.set_perf_improvement(1);
     op.set_ram_anchored(100);
     manager_->RegisterOp(&op);

http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/maintenance_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index eda71e8..d2206dc 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -29,8 +29,8 @@
 #include "kudu/util/debug/trace_logging.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
-#include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/process_memory.h"
 #include "kudu/util/random_util.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/thread.h"
@@ -115,7 +115,6 @@ const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = {
   .num_threads = 0,
   .polling_interval_ms = 0,
   .history_size = 0,
-  .parent_mem_tracker = shared_ptr<MemTracker>(),
 };
 
 MaintenanceManager::MaintenanceManager(const Options& options)
@@ -128,9 +127,8 @@ MaintenanceManager::MaintenanceManager(const Options& options)
           FLAGS_maintenance_manager_polling_interval_ms :
           options.polling_interval_ms),
     completed_ops_count_(0),
-    parent_mem_tracker_(!options.parent_mem_tracker ?
-        MemTracker::GetRootTracker() : options.parent_mem_tracker),
-    rand_(GetRandomSeed32()) {
+    rand_(GetRandomSeed32()),
+    memory_pressure_func_(&process_memory::SoftLimitExceeded) {
   CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
                .set_max_threads(num_threads_).Build(&thread_pool_));
   uint32_t history_size = options.history_size == 0 ?
@@ -364,7 +362,7 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
   // Look at free memory. If it is dangerously low, we must select something
   // that frees memory-- the op with the most anchored memory.
   double capacity_pct;
-  if (parent_mem_tracker_->AnySoftLimitExceeded(&capacity_pct)) {
+  if (memory_pressure_func_(&capacity_pct)) {
     if (!most_mem_anchored_op) {
       string msg = StringPrintf("we have exceeded our soft memory limit "
           "(current capacity is %.2f%%).  However, there are no ops currently "

http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/maintenance_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h
index 5c432a7..6070e2d 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -19,6 +19,7 @@
 
 #include <stdint.h>
 
+#include <functional>
 #include <map>
 #include <memory>
 #include <set>
@@ -42,7 +43,6 @@ template<class T>
 class AtomicGauge;
 class Histogram;
 class MaintenanceManager;
-class MemTracker;
 
 class MaintenanceOpStats {
  public:
@@ -259,7 +259,6 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
     int32_t num_threads;
     int32_t polling_interval_ms;
     uint32_t history_size;
-    std::shared_ptr<MemTracker> parent_mem_tracker;
   };
 
   explicit MaintenanceManager(const Options& options);
@@ -278,6 +277,11 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
 
   void GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb);
 
+  void set_memory_pressure_func_for_tests(std::function<bool(double*)> f) {
+    std::lock_guard<Mutex> guard(lock_);
+    memory_pressure_func_ = std::move(f);
+  }
+
   static const Options DEFAULT_OPTIONS;
 
  private:
@@ -307,10 +311,13 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
   // the completed_ops_count_ % the vector's size and then the count needs to be incremented.
   std::vector<CompletedOp> completed_ops_;
   int64_t completed_ops_count_;
-  std::shared_ptr<MemTracker> parent_mem_tracker_;
   std::string server_uuid_;
   Random rand_;
 
+  // Function which should return true if the server is under global memory pressure.
+  // This is indirected for testing purposes.
+  std::function<bool(double*)> memory_pressure_func_;
+
   DISALLOW_COPY_AND_ASSIGN(MaintenanceManager);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/mem_tracker-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/mem_tracker-test.cc b/src/kudu/util/mem_tracker-test.cc
index 11bf2a2..7e78cbe 100644
--- a/src/kudu/util/mem_tracker-test.cc
+++ b/src/kudu/util/mem_tracker-test.cc
@@ -24,14 +24,9 @@
 #include <utility>
 #include <vector>
 
-#include <boost/bind.hpp>
-#include <gperftools/malloc_extension.h>
-
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/test_util.h"
 
-DECLARE_int32(memory_limit_soft_percentage);
-
 namespace kudu {
 
 using std::equal_to;
@@ -127,54 +122,6 @@ class GcFunctionHelper {
   MemTracker* tracker_;
 };
 
-TEST(MemTrackerTest, GcFunctions) {
-  shared_ptr<MemTracker> t = MemTracker::CreateTracker(10, "");
-  ASSERT_TRUE(t->has_limit());
-
-  t->Consume(9);
-  EXPECT_FALSE(t->LimitExceeded());
-
-  // Test TryConsume()
-  EXPECT_FALSE(t->TryConsume(2));
-  EXPECT_EQ(t->consumption(), 9);
-  EXPECT_FALSE(t->LimitExceeded());
-
-  // Attach GcFunction that releases 1 byte
-  GcFunctionHelper gc_func_helper(t.get());
-  t->AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper));
-  EXPECT_TRUE(t->TryConsume(2));
-  EXPECT_EQ(t->consumption(), 10);
-  EXPECT_FALSE(t->LimitExceeded());
-
-  // GcFunction will be called even though TryConsume() fails
-  EXPECT_FALSE(t->TryConsume(2));
-  EXPECT_EQ(t->consumption(), 9);
-  EXPECT_FALSE(t->LimitExceeded());
-
-  // GcFunction won't be called
-  EXPECT_TRUE(t->TryConsume(1));
-  EXPECT_EQ(t->consumption(), 10);
-  EXPECT_FALSE(t->LimitExceeded());
-
-  // Test LimitExceeded()
-  t->Consume(1);
-  EXPECT_EQ(t->consumption(), 11);
-  EXPECT_FALSE(t->LimitExceeded());
-  EXPECT_EQ(t->consumption(), 10);
-
-  // Add more GcFunctions, test that we only call them until the limit is no longer
-  // exceeded
-  GcFunctionHelper gc_func_helper2(t.get());
-  t->AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper2));
-  GcFunctionHelper gc_func_helper3(t.get());
-  t->AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper3));
-  t->Consume(1);
-  EXPECT_EQ(t->consumption(), 11);
-  EXPECT_FALSE(t->LimitExceeded());
-  EXPECT_EQ(t->consumption(), 10);
-  t->Release(10);
-}
-
 TEST(MemTrackerTest, STLContainerAllocator) {
   shared_ptr<MemTracker> t = MemTracker::CreateTracker(-1, "t");
   MemTrackerAllocator<int> vec_alloc(t);
@@ -249,68 +196,6 @@ TEST(MemTrackerTest, ScopedTrackedConsumption) {
   ASSERT_EQ(0, m->consumption());
 }
 
-TEST(MemTrackerTest, SoftLimitExceeded) {
-  const int kNumIters = 100000;
-  const int kMemLimit = 1000;
-  google::FlagSaver saver;
-  FLAGS_memory_limit_soft_percentage = 0;
-  shared_ptr<MemTracker> m = MemTracker::CreateTracker(kMemLimit, "test");
-
-  // Consumption is 0; the soft limit is never exceeded.
-  for (int i = 0; i < kNumIters; i++) {
-    ASSERT_FALSE(m->SoftLimitExceeded(nullptr));
-  }
-
-  // Consumption is half of the actual limit, so we expect to exceed the soft
-  // limit roughly half the time.
-  ScopedTrackedConsumption consumption(m, kMemLimit / 2);
-  int exceeded_count = 0;
-  for (int i = 0; i < kNumIters; i++) {
-    double current_percentage;
-    if (m->SoftLimitExceeded(&current_percentage)) {
-      exceeded_count++;
-      ASSERT_NEAR(50, current_percentage, 0.1);
-    }
-  }
-  double exceeded_pct = static_cast<double>(exceeded_count) / kNumIters * 100;
-  ASSERT_TRUE(exceeded_pct > 47 && exceeded_pct < 52);
-
-  // Consumption is over the limit; the soft limit is always exceeded.
-  consumption.Reset(kMemLimit + 1);
-  for (int i = 0; i < kNumIters; i++) {
-    double current_percentage;
-    ASSERT_TRUE(m->SoftLimitExceeded(&current_percentage));
-    ASSERT_NEAR(100, current_percentage, 0.1);
-  }
-}
-
-#ifdef TCMALLOC_ENABLED
-TEST(MemTrackerTest, TcMallocRootTracker) {
-  shared_ptr<MemTracker> root = MemTracker::GetRootTracker();
-
-  // The root tracker's consumption and tcmalloc should agree.
-  size_t value;
-  root->UpdateConsumption();
-  ASSERT_TRUE(MallocExtension::instance()->GetNumericProperty(
-      "generic.current_allocated_bytes", &value));
-  ASSERT_EQ(value, root->consumption());
-
-  // Explicit Consume() and Release() have no effect.
-  root->Consume(100);
-  ASSERT_EQ(value, root->consumption());
-  root->Release(3);
-  ASSERT_EQ(value, root->consumption());
-
-  // But if we allocate something really big, we should see a change.
-  gscoped_ptr<char[]> big_alloc(new char[4*1024*1024]);
-  // clang in release mode can optimize out the above allocation unless
-  // we do something with the pointer... so we just log it.
-  VLOG(8) << static_cast<void*>(big_alloc.get());
-  root->UpdateConsumption();
-  ASSERT_GT(root->consumption(), value);
-}
-#endif
-
 TEST(MemTrackerTest, CollisionDetection) {
   shared_ptr<MemTracker> p = MemTracker::CreateTracker(-1, "parent");
   shared_ptr<MemTracker> c = MemTracker::CreateTracker(-1, "child", p);

http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/mem_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/mem_tracker.cc b/src/kudu/util/mem_tracker.cc
index b247cb4..88e1d23 100644
--- a/src/kudu/util/mem_tracker.cc
+++ b/src/kudu/util/mem_tracker.cc
@@ -23,49 +23,14 @@
 #include <list>
 #include <memory>
 #include <mutex>
-#include <sstream>
-
-#include <gperftools/malloc_extension.h>
 
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/once.h"
-#include "kudu/gutil/strings/join.h"
-#include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/debug-util.h"
-#include "kudu/util/debug/trace_event.h"
-#include "kudu/util/env.h"
-#include "kudu/util/flag_tags.h"
 #include "kudu/util/mutex.h"
-#include "kudu/util/random_util.h"
+#include "kudu/util/process_memory.h"
 #include "kudu/util/status.h"
 
-DEFINE_int64(memory_limit_hard_bytes, 0,
-             "Maximum amount of memory this daemon should use, in bytes. "
-             "A value of 0 autosizes based on the total system memory. "
-             "A value of -1 disables all memory limiting.");
-TAG_FLAG(memory_limit_hard_bytes, stable);
-
-DEFINE_int32(memory_limit_soft_percentage, 60,
-             "Percentage of the hard memory limit that this daemon may "
-             "consume before memory throttling of writes begins. The greater "
-             "the excess, the higher the chance of throttling. In general, a "
-             "lower soft limit leads to smoother write latencies but "
-             "decreased throughput, and vice versa for a higher soft limit.");
-TAG_FLAG(memory_limit_soft_percentage, advanced);
-
-DEFINE_int32(memory_limit_warn_threshold_percentage, 98,
-             "Percentage of the hard memory limit that this daemon may "
-             "consume before WARNING level messages are periodically logged.");
-TAG_FLAG(memory_limit_warn_threshold_percentage, advanced);
-
-#ifdef TCMALLOC_ENABLED
-DEFINE_int32(tcmalloc_max_free_bytes_percentage, 10,
-             "Maximum percentage of the RSS that tcmalloc is allowed to use for "
-             "reserved but unallocated memory.");
-TAG_FLAG(tcmalloc_max_free_bytes_percentage, advanced);
-#endif
-
 namespace kudu {
 
 // NOTE: this class has been adapted from Impala, so the code style varies
@@ -73,7 +38,6 @@ namespace kudu {
 
 using std::deque;
 using std::list;
-using std::ostringstream;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -85,94 +49,29 @@ using strings::Substitute;
 static shared_ptr<MemTracker> root_tracker;
 static GoogleOnceType root_tracker_once = GOOGLE_ONCE_INIT;
 
-// Total amount of memory from calls to Release() since the last GC. If this
-// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc.
-static Atomic64 released_memory_since_gc;
-
-// Validate that various flags are percentages.
-static bool ValidatePercentage(const char* flagname, int value) {
-  if (value >= 0 && value <= 100) {
-    return true;
-  }
-  LOG(ERROR) << Substitute("$0 must be a percentage, value $1 is invalid",
-                           flagname, value);
-  return false;
-}
-static bool dummy[] = {
-  google::RegisterFlagValidator(&FLAGS_memory_limit_soft_percentage, &ValidatePercentage),
-  google::RegisterFlagValidator(&FLAGS_memory_limit_warn_threshold_percentage, &ValidatePercentage)
-#ifdef TCMALLOC_ENABLED
-  ,google::RegisterFlagValidator(&FLAGS_tcmalloc_max_free_bytes_percentage, &ValidatePercentage)
-#endif
-};
-
-#ifdef TCMALLOC_ENABLED
-static int64_t GetTCMallocProperty(const char* prop) {
-  size_t value;
-  if (!MallocExtension::instance()->GetNumericProperty(prop, &value)) {
-    LOG(DFATAL) << "Failed to get tcmalloc property " << prop;
-  }
-  return value;
-}
-
-static int64_t GetTCMallocCurrentAllocatedBytes() {
-  return GetTCMallocProperty("generic.current_allocated_bytes");
-}
-#endif
-
 void MemTracker::CreateRootTracker() {
-  int64_t limit = FLAGS_memory_limit_hard_bytes;
-  if (limit == 0) {
-    // If no limit is provided, we'll use 80% of system RAM.
-    int64_t total_ram;
-    CHECK_OK(Env::Default()->GetTotalRAMBytes(&total_ram));
-    limit = total_ram * 4;
-    limit /= 5;
-  }
-
-  ConsumptionFunction f;
-#ifdef TCMALLOC_ENABLED
-  f = &GetTCMallocCurrentAllocatedBytes;
-#endif
-  root_tracker.reset(new MemTracker(f, limit, "root",
-                                    shared_ptr<MemTracker>()));
+  root_tracker.reset(new MemTracker(-1, "root", shared_ptr<MemTracker>()));
   root_tracker->Init();
-  LOG(INFO) << StringPrintf("MemTracker: hard memory limit is %.6f GB",
-                            (static_cast<float>(limit) / (1024.0 * 1024.0 * 1024.0)));
-  LOG(INFO) << StringPrintf("MemTracker: soft memory limit is %.6f GB",
-                            (static_cast<float>(root_tracker->soft_limit_) /
-                                (1024.0 * 1024.0 * 1024.0)));
 }
 
 shared_ptr<MemTracker> MemTracker::CreateTracker(int64_t byte_limit,
                                                  const string& id,
                                                  const shared_ptr<MemTracker>& parent) {
   shared_ptr<MemTracker> real_parent = parent ? parent : GetRootTracker();
-  shared_ptr<MemTracker> tracker(
-      new MemTracker(ConsumptionFunction(), byte_limit, id, real_parent));
+  shared_ptr<MemTracker> tracker(new MemTracker(byte_limit, id, real_parent));
   real_parent->AddChildTracker(tracker);
   tracker->Init();
 
   return tracker;
 }
 
-MemTracker::MemTracker(ConsumptionFunction consumption_func, int64_t byte_limit,
-                       const string& id, shared_ptr<MemTracker> parent)
+MemTracker::MemTracker(int64_t byte_limit, const string& id, shared_ptr<MemTracker> parent)
     : limit_(byte_limit),
       id_(id),
       descr_(Substitute("memory consumption for $0", id)),
       parent_(std::move(parent)),
-      consumption_(0),
-      consumption_func_(std::move(consumption_func)),
-      rand_(GetRandomSeed32()),
-      enable_logging_(false),
-      log_stack_(false) {
+      consumption_(0) {
   VLOG(1) << "Creating tracker " << ToString();
-  if (consumption_func_) {
-    UpdateConsumption();
-  }
-  soft_limit_ = (limit_ == -1)
-      ? -1 : (limit_ * FLAGS_memory_limit_soft_percentage) / 100;
 }
 
 MemTracker::~MemTracker() {
@@ -284,67 +183,35 @@ void MemTracker::ListTrackers(vector<shared_ptr<MemTracker>>* trackers) {
   }
 }
 
-void MemTracker::UpdateConsumption() {
-  DCHECK(!consumption_func_.empty());
-  DCHECK(parent_.get() == NULL);
-  consumption_.set_value(consumption_func_());
-}
-
 void MemTracker::Consume(int64_t bytes) {
   if (bytes < 0) {
     Release(-bytes);
     return;
   }
 
-  if (!consumption_func_.empty()) {
-    UpdateConsumption();
-    return;
-  }
   if (bytes == 0) {
     return;
   }
-  if (PREDICT_FALSE(enable_logging_)) {
-    LogUpdate(true, bytes);
-  }
   for (auto& tracker : all_trackers_) {
     tracker->consumption_.IncrementBy(bytes);
-    if (!tracker->consumption_func_.empty()) {
-      DCHECK_GE(tracker->consumption_.current_value(), 0);
-    }
   }
 }
 
 bool MemTracker::TryConsume(int64_t bytes) {
-  if (!consumption_func_.empty()) {
-    UpdateConsumption();
-  }
   if (bytes <= 0) {
+    Release(-bytes);
     return true;
   }
-  if (PREDICT_FALSE(enable_logging_)) {
-    LogUpdate(true, bytes);
-  }
 
   int i = 0;
-  // Walk the tracker tree top-down, to avoid expanding a limit on a child whose parent
-  // won't accommodate the change.
+  // Walk the tracker tree top-down, consuming memory from each in turn.
   for (i = all_trackers_.size() - 1; i >= 0; --i) {
     MemTracker *tracker = all_trackers_[i];
     if (tracker->limit_ < 0) {
       tracker->consumption_.IncrementBy(bytes);
     } else {
       if (!tracker->consumption_.TryIncrementBy(bytes, tracker->limit_)) {
-        // One of the trackers failed, attempt to GC memory or expand our limit. If that
-        // succeeds, TryUpdate() again. Bail if either fails.
-        if (!tracker->GcMemory(tracker->limit_ - bytes) ||
-            tracker->ExpandLimit(bytes)) {
-          if (!tracker->consumption_.TryIncrementBy(
-                  bytes, tracker->limit_)) {
-            break;
-          }
-        } else {
-          break;
-        }
+        break;
       }
     }
   }
@@ -354,14 +221,10 @@ bool MemTracker::TryConsume(int64_t bytes) {
   }
 
   // Someone failed, roll back the ones that succeeded.
-  // TODO: this doesn't roll it back completely since the max values for
+  // TODO(todd): this doesn't roll it back completely since the max values for
   // the updated trackers aren't decremented. The max values are only used
   // for error reporting so this is probably okay. Rolling those back is
   // pretty hard; we'd need something like 2PC.
-  //
-  // TODO: This might leave us with an allocated resource that we can't use. Do we need
-  // to adjust the consumption of the query tracker to stop the resource from never
-  // getting used by a subsequent TryConsume()?
   for (int j = all_trackers_.size() - 1; j > i; --j) {
     all_trackers_[j]->consumption_.IncrementBy(-bytes);
   }
@@ -374,35 +237,14 @@ void MemTracker::Release(int64_t bytes) {
     return;
   }
 
-  if (PREDICT_FALSE(base::subtle::Barrier_AtomicIncrement(&released_memory_since_gc, bytes) >
-                    GC_RELEASE_SIZE)) {
-    GcTcmalloc();
-  }
-
-  if (!consumption_func_.empty()) {
-    UpdateConsumption();
-    return;
-  }
-
   if (bytes == 0) {
     return;
   }
-  if (PREDICT_FALSE(enable_logging_)) {
-    LogUpdate(false, bytes);
-  }
 
   for (auto& tracker : all_trackers_) {
     tracker->consumption_.IncrementBy(-bytes);
-    // If a UDF calls FunctionContext::TrackAllocation() but allocates less than the
-    // reported amount, the subsequent call to FunctionContext::Free() may cause the
-    // process mem tracker to go negative until it is synced back to the tcmalloc
-    // metric. Don't blow up in this case. (Note that this doesn't affect non-process
-    // trackers since we can enforce that the reported memory usage is internally
-    // consistent.)
-    if (!tracker->consumption_func_.empty()) {
-      DCHECK_GE(tracker->consumption_.current_value(), 0);
-    }
   }
+  process_memory::MaybeGCAfterRelease(bytes);
 }
 
 bool MemTracker::AnyLimitExceeded() {
@@ -414,55 +256,6 @@ bool MemTracker::AnyLimitExceeded() {
   return false;
 }
 
-bool MemTracker::LimitExceeded() {
-  if (PREDICT_FALSE(CheckLimitExceeded())) {
-    return GcMemory(limit_);
-  }
-  return false;
-}
-
-bool MemTracker::SoftLimitExceeded(double* current_capacity_pct) {
-  // Did we exceed the actual limit?
-  if (LimitExceeded()) {
-    if (current_capacity_pct) {
-      *current_capacity_pct =
-          static_cast<double>(consumption()) / limit() * 100;
-    }
-    return true;
-  }
-
-  // No soft limit defined.
-  if (!has_limit() || limit_ == soft_limit_) {
-    return false;
-  }
-
-  // Are we under the soft limit threshold?
-  int64_t usage = consumption();
-  if (usage < soft_limit_) {
-    return false;
-  }
-
-  // We're over the threshold; were we randomly chosen to be over the soft limit?
-  if (usage + rand_.Uniform64(limit_ - soft_limit_) > limit_) {
-    bool exceeded = GcMemory(soft_limit_);
-    if (exceeded && current_capacity_pct) {
-      *current_capacity_pct =
-          static_cast<double>(consumption()) / limit() * 100;
-    }
-    return exceeded;
-  }
-  return false;
-}
-
-bool MemTracker::AnySoftLimitExceeded(double* current_capacity_pct) {
-  for (MemTracker* t : limit_trackers_) {
-    if (t->SoftLimitExceeded(current_capacity_pct)) {
-      return true;
-    }
-  }
-  return false;
-}
-
 int64_t MemTracker::SpareCapacity() const {
   int64_t result = std::numeric_limits<int64_t>::max();
   for (const auto& tracker : limit_trackers_) {
@@ -472,84 +265,6 @@ int64_t MemTracker::SpareCapacity() const {
   return result;
 }
 
-bool MemTracker::GcMemory(int64_t max_consumption) {
-  if (max_consumption < 0) {
-    // Impossible to GC enough memory to reach the goal.
-    return true;
-  }
-
-  std::lock_guard<simple_spinlock> l(gc_lock_);
-  if (!consumption_func_.empty()) {
-    UpdateConsumption();
-  }
-  uint64_t pre_gc_consumption = consumption();
-  // Check if someone gc'd before us
-  if (pre_gc_consumption < max_consumption) {
-    return false;
-  }
-
-  // Try to free up some memory
-  for (const auto& gc_function : gc_functions_) {
-    gc_function();
-    if (!consumption_func_.empty()) {
-      UpdateConsumption();
-    }
-    if (consumption() <= max_consumption) {
-      break;
-    }
-  }
-
-  return consumption() > max_consumption;
-}
-
-void MemTracker::GcTcmalloc() {
-#ifdef TCMALLOC_ENABLED
-  released_memory_since_gc = 0;
-  TRACE_EVENT0("process", "MemTracker::GcTcmalloc");
-
-  // Number of bytes in the 'NORMAL' free list (i.e reserved by tcmalloc but
-  // not in use).
-  int64_t bytes_overhead = GetTCMallocProperty("tcmalloc.pageheap_free_bytes");
-  // Bytes allocated by the application.
-  int64_t bytes_used = GetTCMallocCurrentAllocatedBytes();
-
-  int64_t max_overhead = bytes_used * FLAGS_tcmalloc_max_free_bytes_percentage / 100.0;
-  if (bytes_overhead > max_overhead) {
-    int64_t extra = bytes_overhead - max_overhead;
-    while (extra > 0) {
-      // Release 1MB at a time, so that tcmalloc releases its page heap lock
-      // allowing other threads to make progress. This still disrupts the current
-      // thread, but is better than disrupting all.
-      MallocExtension::instance()->ReleaseToSystem(1024 * 1024);
-      extra -= 1024 * 1024;
-    }
-  }
-
-#else
-  // Nothing to do if not using tcmalloc.
-#endif
-}
-
-string MemTracker::LogUsage(const string& prefix) const {
-  ostringstream ss;
-  ss << prefix << id_ << ":";
-  if (CheckLimitExceeded()) {
-    ss << " memory limit exceeded.";
-  }
-  if (limit_ > 0) {
-    ss << " Limit=" << HumanReadableNumBytes::ToString(limit_);
-  }
-  ss << " Consumption=" << HumanReadableNumBytes::ToString(consumption());
-
-  ostringstream prefix_ss;
-  prefix_ss << prefix << "  ";
-  string new_prefix = prefix_ss.str();
-  MutexLock l(child_trackers_lock_);
-  if (!child_trackers_.empty()) {
-    ss << "\n" << LogUsage(new_prefix, child_trackers_);
-  }
-  return ss.str();
-}
 
 void MemTracker::Init() {
   // populate all_trackers_ and limit_trackers_
@@ -568,28 +283,6 @@ void MemTracker::AddChildTracker(const shared_ptr<MemTracker>& tracker) {
   tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), tracker);
 }
 
-void MemTracker::LogUpdate(bool is_consume, int64_t bytes) const {
-  ostringstream ss;
-  ss << this << " " << (is_consume ? "Consume: " : "Release: ") << bytes
-     << " Consumption: " << consumption() << " Limit: " << limit_;
-  if (log_stack_) {
-    ss << std::endl << GetStackTrace();
-  }
-  LOG(ERROR) << ss.str();
-}
-
-string MemTracker::LogUsage(const string& prefix,
-                            const list<weak_ptr<MemTracker>>& trackers) {
-  vector<string> usage_strings;
-  for (const auto& child_weak : trackers) {
-    shared_ptr<MemTracker> child = child_weak.lock();
-    if (child) {
-      usage_strings.push_back(child->LogUsage(prefix));
-    }
-  }
-  return JoinStrings(usage_strings, "\n");
-}
-
 shared_ptr<MemTracker> MemTracker::GetRootTracker() {
   GoogleOnceInit(&root_tracker_once, &MemTracker::CreateRootTracker);
   return root_tracker;

http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/mem_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/mem_tracker.h b/src/kudu/util/mem_tracker.h
index cd00e22..a43e9a2 100644
--- a/src/kudu/util/mem_tracker.h
+++ b/src/kudu/util/mem_tracker.h
@@ -17,7 +17,6 @@
 #ifndef KUDU_UTIL_MEM_TRACKER_H
 #define KUDU_UTIL_MEM_TRACKER_H
 
-#include <boost/function.hpp>
 #include <list>
 #include <memory>
 #include <stdint.h>
@@ -28,7 +27,6 @@
 #include "kudu/util/high_water_mark.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/mutex.h"
-#include "kudu/util/random.h"
 
 namespace kudu {
 
@@ -39,8 +37,8 @@ class MemTracker;
 // arranged into a tree structure such that the consumption tracked by a
 // MemTracker is also tracked by its ancestors.
 //
-// The MemTracker hierarchy is rooted in a single static MemTracker whose limit
-// is set via gflag. The root MemTracker always exists, and it is the common
+// The MemTracker hierarchy is rooted in a single static MemTracker.
+// The root MemTracker always exists, and it is the common
 // ancestor to all MemTrackers. All operations that discover MemTrackers begin
 // at the root and work their way down the tree, while operations that deal
 // with adjusting memory consumption begin at a particular MemTracker and work
@@ -55,51 +53,21 @@ class MemTracker;
 // and the parent has a weak reference to it. Both remain for the lifetime of
 // the MemTracker.
 //
-// By default, memory consumption is tracked via calls to Consume()/Release(), either to
-// the tracker itself or to one of its descendents. Alternatively, a consumption function
-// can specified, and then the function's value is used as the consumption rather than the
-// tally maintained by Consume() and Release(). A tcmalloc function is used to track process
-// memory consumption, since the process memory usage may be higher than the computed
-// total memory (tcmalloc does not release deallocated memory immediately).
-//
-// GcFunctions can be attached to a MemTracker in order to free up memory if the limit is
-// reached. If LimitExceeded() is called and the limit is exceeded, it will first call the
-// GcFunctions to try to free memory and recheck the limit. For example, the process
-// tracker has a GcFunction that releases any unused memory still held by tcmalloc, so
-// this will be called before the process limit is reported as exceeded. GcFunctions are
-// called in the order they are added, so expensive functions should be added last.
+// Memory consumption is tracked via calls to Consume()/Release(), either to
+// the tracker itself or to one of its descendants.
 //
 // This class is thread-safe.
-//
-// NOTE: this class has been partially ported over from Impala with
-// several changes, and as a result the style differs somewhat from
-// the Kudu style.
-//
-// Changes from Impala:
-// 1) Id a string vs. a TUniqueId
-// 2) There is no concept of query trackers vs. pool trackers -- trackers are instead
-//    associated with objects. Parent hierarchy is preserved, with the assumption that,
-//    e.g., a tablet server's memtracker will have as its children the tablets' memtrackers,
-//    which in turn will have memtrackers for their caches, logs, and so forth.
-//
-// TODO: this classes uses a lot of statics fields and methods, which
-// isn't common in Kudu. It is probably wise to later move the
-// 'registry' of trackers to a separate class, but it's better to
-// start using the 'class' *first* and then change this functionality,
-// depending on how MemTracker ends up being used in Kudu.
 class MemTracker : public std::enable_shared_from_this<MemTracker> {
  public:
-
-  // Signature for function that can be called to free some memory after limit is reached.
-  typedef boost::function<void ()> GcFunction;
-
   ~MemTracker();
 
   // Creates and adds the tracker to the tree so that it can be retrieved with
   // FindTracker/FindOrCreateTracker.
   //
-  // byte_limit < 0 means no limit; 'id' is a used as a label for LogUsage()
-  // and web UI. Use the two-argument form if there is no parent.
+  // byte_limit < 0 means no limit; 'id' is a used as a label to uniquely identify
+  // the MemTracker for the below Find...() calls as well as the web UI.
+  //
+  // Use the two-argument form if there is no parent.
   static std::shared_ptr<MemTracker> CreateTracker(
       int64_t byte_limit,
       const std::string& id,
@@ -133,19 +101,9 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
   // Gets a shared_ptr to the "root" tracker, creating it if necessary.
   static std::shared_ptr<MemTracker> GetRootTracker();
 
-  // Updates consumption from the consumption function specified in the constructor.
-  // NOTE: this method will crash if 'consumption_func_' is not set.
-  void UpdateConsumption();
-
   // Increases consumption of this tracker and its ancestors by 'bytes'.
   void Consume(int64_t bytes);
 
-  // Try to expand the limit (by asking the resource broker for more memory) by at least
-  // 'bytes'. Returns false if not possible, true if the request succeeded. May allocate
-  // more memory than was requested.
-  // TODO: always returns false for now, not yet implemented.
-  bool ExpandLimit(int64_t /* unused: bytes */) { return false; }
-
   // Increases consumption of this tracker and its ancestors by 'bytes' only if
   // they can all consume 'bytes'. If this brings any of them over, none of them
   // are updated.
@@ -153,6 +111,9 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
   bool TryConsume(int64_t bytes);
 
   // Decreases consumption of this tracker and its ancestors by 'bytes'.
+  //
+  // This will also cause the process to periodically trigger tcmalloc "ReleaseMemory"
+  // to ensure that memory is released to the OS.
   void Release(int64_t bytes);
 
   // Returns true if a valid limit of this tracker or one of its ancestors is
@@ -162,21 +123,9 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
   // If this tracker has a limit, checks the limit and attempts to free up some memory if
   // the limit is exceeded by calling any added GC functions. Returns true if the limit is
   // exceeded after calling the GC functions. Returns false if there is no limit.
-  bool LimitExceeded();
-
-  // Like LimitExceeded() but may also return true if the soft memory limit is exceeded.
-  // The greater the excess, the higher the chance that it returns true.
-  //
-  // If the soft limit is exceeded and 'current_capacity_pct' is not NULL, the percentage
-  // of the hard limit consumed is written to it.
-  bool SoftLimitExceeded(double* current_capacity_pct);
-
-  // Combines the semantics of AnyLimitExceeded() and SoftLimitExceeded().
-  //
-  // Note: if there's more than one soft limit defined, the probability of it being
-  // exceeded in at least one tracker is much higher (as each soft limit check is an
-  // independent event).
-  bool AnySoftLimitExceeded(double* current_capacity_pct);
+  bool LimitExceeded() {
+    return limit_ >= 0 && limit_ < consumption();
+  }
 
   // Returns the maximum consumption that can be made without exceeding the limit on
   // this tracker or any of its parents. Returns int64_t::max() if there are no
@@ -193,62 +142,19 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
     return consumption_.current_value();
   }
 
-  // Note that if consumption_ is based on consumption_func_, this
-  // will be the max value we've recorded in consumption(), not
-  // necessarily the highest value consumption_func_ has ever
-  // reached.
   int64_t peak_consumption() const { return consumption_.max_value(); }
 
   // Retrieve the parent tracker, or NULL If one is not set.
   std::shared_ptr<MemTracker> parent() const { return parent_; }
 
-  // Add a function 'f' to be called if the limit is reached.
-  // 'f' does not need to be thread-safe as long as it is added to only one MemTracker.
-  // Note that 'f' must be valid for the lifetime of this MemTracker.
-  void AddGcFunction(GcFunction f) {
-    gc_functions_.push_back(f);
-  }
-
-  // Logs the usage of this tracker and all of its children (recursively).
-  std::string LogUsage(const std::string& prefix = "") const;
-
-  void EnableLogging(bool enable, bool log_stack) {
-    enable_logging_ = enable;
-    log_stack_ = log_stack;
-  }
-
   // Returns a textual representation of the tracker that is likely (but not
   // guaranteed) to be globally unique.
   std::string ToString() const;
 
  private:
-  // Function signatures for gauge-style memory trackers (where consumption is
-  // periodically observed rather than explicitly tracked).
-  //
-  // Currently only used by the root tracker.
-  typedef boost::function<uint64_t ()> ConsumptionFunction;
-
-  // If consumption_func is not empty, uses it as the consumption value.
-  // Consume()/Release() can still be called.
   // byte_limit < 0 means no limit
   // 'id' is the label for LogUsage() and web UI.
-  MemTracker(ConsumptionFunction consumption_func, int64_t byte_limit,
-             const std::string& id, std::shared_ptr<MemTracker> parent);
-
-  bool CheckLimitExceeded() const {
-    return limit_ >= 0 && limit_ < consumption();
-  }
-
-  // If consumption is higher than max_consumption, attempts to free memory by calling any
-  // added GC functions.  Returns true if max_consumption is still exceeded. Takes
-  // gc_lock. Updates metrics if initialized.
-  bool GcMemory(int64_t max_consumption);
-
-  // Called when the total release memory is larger than GC_RELEASE_SIZE.
-  // TcMalloc holds onto released memory and very slowly (if ever) releases it back to
-  // the OS. This is problematic since it is memory we are not constantly tracking which
-  // can cause us to go way over mem limits.
-  void GcTcmalloc();
+  MemTracker(int64_t byte_limit, const std::string& id, std::shared_ptr<MemTracker> parent);
 
   // Further initializes the tracker.
   void Init();
@@ -256,12 +162,6 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
   // Adds tracker to child_trackers_.
   void AddChildTracker(const std::shared_ptr<MemTracker>& tracker);
 
-  // Logs the stack of the current consume/release. Used for debugging only.
-  void LogUpdate(bool is_consume, int64_t bytes) const;
-
-  static std::string LogUsage(const std::string& prefix,
-                              const std::list<std::weak_ptr<MemTracker>>& trackers);
-
   // Variant of FindTracker() that must be called with a non-NULL parent.
   static bool FindTrackerInternal(
       const std::string& id,
@@ -271,25 +171,13 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
   // Creates the root tracker.
   static void CreateRootTracker();
 
-  // Size, in bytes, that is considered a large value for Release() (or Consume() with
-  // a negative value). If tcmalloc is used, this can trigger it to GC.
-  // A higher value will make us call into tcmalloc less often (and therefore more
-  // efficient). A lower value will mean our memory overhead is lower.
-  // TODO: this is a stopgap.
-  static const int64_t GC_RELEASE_SIZE = 128 * 1024L * 1024L;
-
-  simple_spinlock gc_lock_;
-
   int64_t limit_;
-  int64_t soft_limit_;
   const std::string id_;
   const std::string descr_;
   std::shared_ptr<MemTracker> parent_;
 
   HighWaterMark consumption_;
 
-  ConsumptionFunction consumption_func_;
-
   // this tracker plus all of its ancestors
   std::vector<MemTracker*> all_trackers_;
   // all_trackers_ with valid limits
@@ -304,17 +192,6 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
   // Iterator into parent_->child_trackers_ for this object. Stored to have O(1)
   // remove.
   std::list<std::weak_ptr<MemTracker>>::iterator child_tracker_it_;
-
-  // Functions to call after the limit is reached to free memory.
-  std::vector<GcFunction> gc_functions_;
-
-  ThreadSafeRandom rand_;
-
-  // If true, logs to INFO every consume/release called. Used for debugging.
-  bool enable_logging_;
-
-  // If true, log the stack as well.
-  bool log_stack_;
 };
 
 // An std::allocator that manipulates a MemTracker during allocation

http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/process_memory.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/process_memory.cc b/src/kudu/util/process_memory.cc
new file mode 100644
index 0000000..d3868c2
--- /dev/null
+++ b/src/kudu/util/process_memory.cc
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <sys/resource.h>
+
+#include <gflags/gflags.h>
+#include <gperftools/malloc_extension.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/process_memory.h"
+#include "kudu/util/random.h"
+
+DEFINE_int64(memory_limit_hard_bytes, 0,
+             "Maximum amount of memory this daemon should use, in bytes. "
+             "A value of 0 autosizes based on the total system memory. "
+             "A value of -1 disables all memory limiting.");
+TAG_FLAG(memory_limit_hard_bytes, stable);
+
+DEFINE_int32(memory_limit_soft_percentage, 60,
+             "Percentage of the hard memory limit that this daemon may "
+             "consume before memory throttling of writes begins. The greater "
+             "the excess, the higher the chance of throttling. In general, a "
+             "lower soft limit leads to smoother write latencies but "
+             "decreased throughput, and vice versa for a higher soft limit.");
+TAG_FLAG(memory_limit_soft_percentage, advanced);
+
+DEFINE_int32(memory_limit_warn_threshold_percentage, 98,
+             "Percentage of the hard memory limit that this daemon may "
+             "consume before WARNING level messages are periodically logged.");
+TAG_FLAG(memory_limit_warn_threshold_percentage, advanced);
+
+#ifdef TCMALLOC_ENABLED
+DEFINE_int32(tcmalloc_max_free_bytes_percentage, 10,
+             "Maximum percentage of the RSS that tcmalloc is allowed to use for "
+             "reserved but unallocated memory.");
+TAG_FLAG(tcmalloc_max_free_bytes_percentage, advanced);
+#endif
+
+using strings::Substitute;
+
+namespace kudu {
+namespace process_memory {
+
+namespace {
+int64_t g_hard_limit;
+int64_t g_soft_limit;
+
+ThreadSafeRandom* g_rand = nullptr;
+
+#ifdef TCMALLOC_ENABLED
+// Total amount of memory released since the last GC. If this
+// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc.
+Atomic64 g_released_memory_since_gc;
+
+// Size, in bytes, that is considered a large value for Release() (or Consume() with
+// a negative value). If tcmalloc is used, this can trigger it to GC.
+// A higher value will make us call into tcmalloc less often (and therefore more
+// efficient). A lower value will mean our memory overhead is lower.
+// TODO(todd): this is a stopgap.
+const int64_t GC_RELEASE_SIZE = 128 * 1024L * 1024L;
+
+#endif // TCMALLOC_ENABLED
+
+} // anonymous namespace
+
+
+// Flag validation
+// ------------------------------------------------------------
+// Validate that various flags are percentages.
+static bool ValidatePercentage(const char* flagname, int value) {
+  if (value >= 0 && value <= 100) {
+    return true;
+  }
+  LOG(ERROR) << Substitute("$0 must be a percentage, value $1 is invalid",
+                           flagname, value);
+  return false;
+}
+
+static bool dummy[] = {
+  google::RegisterFlagValidator(&FLAGS_memory_limit_soft_percentage, &ValidatePercentage),
+  google::RegisterFlagValidator(&FLAGS_memory_limit_warn_threshold_percentage, &ValidatePercentage)
+#ifdef TCMALLOC_ENABLED
+  ,google::RegisterFlagValidator(&FLAGS_tcmalloc_max_free_bytes_percentage, &ValidatePercentage)
+#endif
+};
+
+
+// Wrappers around tcmalloc functionality
+// ------------------------------------------------------------
+#ifdef TCMALLOC_ENABLED
+static int64_t GetTCMallocProperty(const char* prop) {
+  size_t value;
+  if (!MallocExtension::instance()->GetNumericProperty(prop, &value)) {
+    LOG(DFATAL) << "Failed to get tcmalloc property " << prop;
+  }
+  return value;
+}
+
+static int64_t GetTCMallocCurrentAllocatedBytes() {
+  return GetTCMallocProperty("generic.current_allocated_bytes");
+}
+
+void GcTcmalloc() {
+  TRACE_EVENT0("process", "GcTcmalloc");
+
+  // Number of bytes in the 'NORMAL' free list (i.e reserved by tcmalloc but
+  // not in use).
+  int64_t bytes_overhead = GetTCMallocProperty("tcmalloc.pageheap_free_bytes");
+  // Bytes allocated by the application.
+  int64_t bytes_used = GetTCMallocCurrentAllocatedBytes();
+
+  int64_t max_overhead = bytes_used * FLAGS_tcmalloc_max_free_bytes_percentage / 100.0;
+  if (bytes_overhead > max_overhead) {
+    int64_t extra = bytes_overhead - max_overhead;
+    while (extra > 0) {
+      // Release 1MB at a time, so that tcmalloc releases its page heap lock
+      // allowing other threads to make progress. This still disrupts the current
+      // thread, but is better than disrupting all.
+      MallocExtension::instance()->ReleaseToSystem(1024 * 1024);
+      extra -= 1024 * 1024;
+    }
+  }
+}
+#endif // TCMALLOC_ENABLED
+
+
+// Consumption and soft memory limit behavior
+// ------------------------------------------------------------
+namespace {
+void InitLimits() {
+  static std::once_flag once;
+  std::call_once(once, [&]() {
+      int64_t limit = FLAGS_memory_limit_hard_bytes;
+      if (limit == 0) {
+        // If no limit is provided, we'll use 80% of system RAM.
+        int64_t total_ram;
+        CHECK_OK(Env::Default()->GetTotalRAMBytes(&total_ram));
+        limit = total_ram * 4;
+        limit /= 5;
+      }
+      g_hard_limit = limit;
+      g_soft_limit = FLAGS_memory_limit_soft_percentage * g_hard_limit / 100;
+
+      g_rand = new ThreadSafeRandom(1);
+
+      LOG(INFO) << StringPrintf("Process hard memory limit is %.6f GB",
+                                (static_cast<float>(g_hard_limit) / (1024.0 * 1024.0 * 1024.0)));
+      LOG(INFO) << StringPrintf("Process soft memory limit is %.6f GB",
+                                (static_cast<float>(g_soft_limit) /
+                                 (1024.0 * 1024.0 * 1024.0)));
+    });
+}
+} // anonymous namespace
+
+int64_t CurrentConsumption() {
+  // TODO(todd): this is slow to call frequently, since it takes the tcmalloc
+  // global lock. We should look into whether it's faster to hook malloc/free
+  // and update a LongAdder instead, or otherwise make this more incrementally
+  // tracked.
+#ifdef TCMALLOC_ENABLED
+  return GetTCMallocCurrentAllocatedBytes();
+#else
+  // Without tcmalloc, we have no reliable way of determining our own heap
+  // size (e.g. mallinfo doesn't work in ASAN builds). So, we'll fall back
+  // to just looking at the sum of our tracked memory.
+  return MemTracker::GetRootTracker()->consumption();
+#endif
+}
+
+int64_t HardLimit() {
+  return g_hard_limit;
+}
+
+bool SoftLimitExceeded(double* current_capacity_pct) {
+  InitLimits();
+  int64_t consumption = CurrentConsumption();
+  // Did we exceed the actual limit?
+  if (consumption > g_hard_limit) {
+    if (current_capacity_pct) {
+      *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100;
+    }
+    return true;
+  }
+
+  // No soft limit defined.
+  if (g_hard_limit == g_soft_limit) {
+    return false;
+  }
+
+  // Are we under the soft limit threshold?
+  if (consumption < g_soft_limit) {
+    return false;
+  }
+
+  // We're over the threshold; were we randomly chosen to be over the soft limit?
+  if (consumption + g_rand->Uniform64(g_hard_limit - g_soft_limit) > g_hard_limit) {
+    if (current_capacity_pct) {
+      *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100;
+    }
+    return true;
+  }
+  return false;
+}
+
+void MaybeGCAfterRelease(int64_t released_bytes) {
+#ifdef TCMALLOC_ENABLED
+  int64_t now_released = base::subtle::NoBarrier_AtomicIncrement(
+      &g_released_memory_since_gc, -released_bytes);
+  if (PREDICT_FALSE(now_released > GC_RELEASE_SIZE)) {
+    base::subtle::NoBarrier_Store(&g_released_memory_since_gc, 0);
+    GcTcmalloc();
+  }
+#endif
+}
+
+} // namespace process_memory
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/2be32d54/src/kudu/util/process_memory.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/process_memory.h b/src/kudu/util/process_memory.h
new file mode 100644
index 0000000..2868bf5
--- /dev/null
+++ b/src/kudu/util/process_memory.h
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+
+namespace kudu {
+namespace process_memory {
+
+// Probabilistically returns true if the process-wide soft memory limit is exceeded.
+// The greater the excess, the higher the chance that it returns true.
+//
+// If the soft limit is exceeded and 'current_capacity_pct' is not NULL, the percentage
+// of the hard limit consumed is written to it.
+bool SoftLimitExceeded(double* current_capacity_pct);
+
+// Potentially trigger a call to release tcmalloc memory back to the
+// OS, after the given amount of memory was released.
+void MaybeGCAfterRelease(int64_t released_bytes);
+
+// Return the total current memory consumption of the process.
+int64_t CurrentConsumption();
+
+// Return the configured hard limit for the process.
+int64_t HardLimit();
+
+} // namespace process_memory
+} // namespace kudu


Mime
View raw message