kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] 05/08: Various mostly cosmetic improvements to the maintenance manager
Date Wed, 23 Jan 2019 17:54:19 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit c1c187aa620fbcd0cb51781907ff6fefe08b0853
Author: Will Berkeley <wdberkeley@gmail.org>
AuthorDate: Fri Jan 18 15:03:18 2019 -0800

    Various mostly cosmetic improvements to the maintenance manager
    
    Change-Id: I2489a10d092c422ab9e3a0a85b3238fec3b329a6
    Reviewed-on: http://gerrit.cloudera.org:8080/12246
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <adar@cloudera.com>
---
 src/kudu/util/debug/trace_logging.h  |   5 ++
 src/kudu/util/maintenance_manager.cc | 149 ++++++++++++++++++-----------------
 src/kudu/util/maintenance_manager.h  |  43 +++++-----
 3 files changed, 105 insertions(+), 92 deletions(-)

diff --git a/src/kudu/util/debug/trace_logging.h b/src/kudu/util/debug/trace_logging.h
index 1a3b39e..2f0fec9 100644
--- a/src/kudu/util/debug/trace_logging.h
+++ b/src/kudu/util/debug/trace_logging.h
@@ -73,11 +73,16 @@
     } ) ? static_cast<void>(0) :                                      \
           google::LogMessageVoidify() & VLOG_AND_TRACE_INTERNAL(category, vlevel) //
NOLINT(*)
 
+#define VLOG_AND_TRACE_WITH_PREFIX(category, vlevel) \
+  VLOG_AND_TRACE(category, vlevel) << LogPrefix()
 
 #define LOG_AND_TRACE(category, severity) \
   kudu::debug::TraceGLog(__FILE__, __LINE__, category, \
                         google::GLOG_ ## severity, /* send_to_log= */true).stream()
 
+#define LOG_AND_TRACE_WITH_PREFIX(category, severity) \
+  LOG_AND_TRACE(category, severity) << LogPrefix()
+
 namespace kudu {
 namespace debug {
 
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index a1909bb..d7689de 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -64,11 +64,12 @@ DEFINE_int32(maintenance_manager_polling_interval_ms, 250,
 TAG_FLAG(maintenance_manager_polling_interval_ms, hidden);
 
 DEFINE_int32(maintenance_manager_history_size, 8,
-       "Number of completed operations the manager is keeping track of.");
+       "Number of completed operations the manager keeps track of.");
 TAG_FLAG(maintenance_manager_history_size, hidden);
 
 DEFINE_bool(enable_maintenance_manager, true,
-       "Enable the maintenance manager, runs compaction and tablet cleaning tasks.");
+       "Enable the maintenance manager, which runs flush, compaction, and "
+       "garbage collection operations on tablets.");
 TAG_FLAG(enable_maintenance_manager, unsafe);
 
 DEFINE_int64(log_target_replay_size_mb, 1024,
@@ -115,11 +116,11 @@ MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
 
 MaintenanceOp::~MaintenanceOp() {
   CHECK(!manager_.get()) << "You must unregister the " << name_
-         << " Op before destroying it.";
+                         << " Op before destroying it.";
 }
 
 void MaintenanceOp::Unregister() {
-  CHECK(manager_.get()) << "Op " << name_ << " was never registered.";
+  CHECK(manager_.get()) << "Op " << name_ << " is not registered.";
   manager_->UnregisterOp(this);
 }
 
@@ -155,8 +156,10 @@ MaintenanceManager::MaintenanceManager(const Options& options,
     completed_ops_count_(0),
     rand_(GetRandomSeed32()),
     memory_pressure_func_(&process_memory::UnderMemoryPressure) {
-  CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
-               .set_max_threads(num_threads_).Build(&thread_pool_));
+  CHECK_OK(ThreadPoolBuilder("MaintenanceMgr")
+               .set_min_threads(num_threads_)
+               .set_max_threads(num_threads_)
+               .Build(&thread_pool_));
   uint32_t history_size = options.history_size == 0 ?
                           FLAGS_maintenance_manager_history_size :
                           options.history_size;
@@ -200,29 +203,28 @@ void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
   CHECK(op);
   std::lock_guard<Mutex> guard(lock_);
   CHECK(!op->manager_) << "Tried to register " << op->name()
-          << ", but it was already registered.";
+                       << ", but it is already registered.";
   pair<OpMapTy::iterator, bool> val
     (ops_.insert(OpMapTy::value_type(op, MaintenanceOpStats())));
-  CHECK(val.second)
-      << "Tried to register " << op->name()
-      << ", but it already exists in ops_.";
+  CHECK(val.second) << "Tried to register " << op->name()
+                    << ", but it already exists in ops_.";
   op->manager_ = shared_from_this();
   op->cond_.reset(new ConditionVariable(&lock_));
-  VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Registered " << op->name();
+  VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1) << "Registered " << op->name();
 }
 
 void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
   {
     std::lock_guard<Mutex> guard(lock_);
     CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name()
-          << ", but it is not currently registered with this maintenance manager.";
+        << ", but it is not currently registered with this maintenance manager.";
     auto iter = ops_.find(op);
     CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
-        << ", but it was never registered";
+                              << ", but it was never registered";
     // While the op is running, wait for it to be finished.
     if (iter->first->running_ > 0) {
-      VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Waiting for op " <<
op->name()
-                                       << " to finish so we can unregister it.";
+      VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1) << "Waiting for op " << op->name()
+                                                   << " to finish so we can unregister
it.";
     }
     op->CancelAndDisable();
     while (iter->first->running_ > 0) {
@@ -236,7 +238,7 @@ void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
   }
   LOG_WITH_PREFIX(INFO) << "Unregistered op " << op->name();
   op->cond_.reset();
-  // Remove the op's shared_ptr reference to us.  This might 'delete this'.
+  // Remove the op's shared_ptr reference to us. This might 'delete this'.
   op->manager_.reset();
 }
 
@@ -268,21 +270,21 @@ void MaintenanceManager::RunSchedulerThread() {
       prev_iter_found_no_work = false;
     }
     if (shutdown_) {
-      VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Shutting down maintenance
manager.";
+      VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1) << "Shutting down maintenance manager.";
       return;
     }
 
     // Find the best op.
-    pair<MaintenanceOp*, string> op_and_note = FindBestOp();
-    auto* op = op_and_note.first;
-    const auto& note = op_and_note.second;
+    auto best_op_and_why = FindBestOp();
+    auto* op = best_op_and_why.first;
+    const auto& note = best_op_and_why.second;
 
     // If we found no work to do, then we should sleep before trying again to schedule.
     // Otherwise, we can go right into trying to find the next op.
     prev_iter_found_no_work = (op == nullptr);
     if (!op) {
-      VLOG_AND_TRACE("maintenance", 2) << LogPrefix()
-                                       << "No maintenance operations look worth doing.";
+      VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2)
+          << "No maintenance operations look worth doing.";
       continue;
     }
 
@@ -294,15 +296,15 @@ void MaintenanceManager::RunSchedulerThread() {
     guard.lock();
     if (!ready) {
       LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name()
-                            << ".  Re-running scheduler.";
+                            << ". Re-running scheduler.";
       op->running_--;
       running_ops_--;
       op->cond_->Signal();
       continue;
     }
 
-    LOG_AND_TRACE("maintenance", INFO) << LogPrefix() << "Scheduling "
-                                       << op->name() << ": " << note;
+    LOG_AND_TRACE_WITH_PREFIX("maintenance", INFO)
+        << Substitute("Scheduling $0: $1", op->name(), note);
     // Run the maintenance operation.
     Status s = thread_pool_->SubmitFunc(boost::bind(
         &MaintenanceManager::LaunchOp, this, op));
@@ -326,10 +328,10 @@ void MaintenanceManager::RunSchedulerThread() {
 //
 // In the third priority we're at a point where nothing's urgent and there's nothing we can
run
 // quickly.
-// TODO We currently optimize for freeing log retention but we could consider having some
sort of
-// sliding priority between log retention and RAM usage. For example, is an Op that frees
-// 128MB of log retention and 12MB of RAM always better than an op that frees 12MB of log
retention
-// and 128MB of RAM? Maybe a more holistic approach would be better.
+// TODO(wdberkeley) We currently optimize for freeing log retention but we could consider
having
+// some sort of sliding priority between log retention and RAM usage. For example, is an
Op that
+// frees 128MB of log retention and 12MB of RAM always better than an op that frees 12MB
of log
+// retention and 128MB of RAM? Maybe a more holistic approach would be better.
 pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
   TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp");
 
@@ -341,8 +343,8 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
   int64_t low_io_most_logs_retained_bytes = 0;
   MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr;
 
-  uint64_t most_mem_anchored = 0;
-  MaintenanceOp* most_mem_anchored_op = nullptr;
+  int64_t most_ram_anchored = 0;
+  MaintenanceOp* most_ram_anchored_op = nullptr;
 
   int64_t most_logs_retained_bytes = 0;
   int64_t most_logs_retained_bytes_ram_anchored = 0;
@@ -353,7 +355,7 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
 
   double best_perf_improvement = 0;
   MaintenanceOp* best_perf_improvement_op = nullptr;
-  for (OpMapTy::value_type &val : ops_) {
+  for (auto& val : ops_) {
     MaintenanceOp* op(val.first);
     MaintenanceOpStats& stats(val.second);
     VLOG_WITH_PREFIX(3) << "Considering MM op " << op->name();
@@ -363,66 +365,71 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp()
{
     if (op->cancelled() || !stats.valid() || !stats.runnable()) {
       continue;
     }
-    if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes &&
+
+    const auto logs_retained_bytes = stats.logs_retained_bytes();
+    if (logs_retained_bytes > low_io_most_logs_retained_bytes &&
         op->io_usage() == MaintenanceOp::LOW_IO_USAGE) {
       low_io_most_logs_retained_bytes_op = op;
-      low_io_most_logs_retained_bytes = stats.logs_retained_bytes();
-      VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name()
<< " can free "
-                                       << stats.logs_retained_bytes() << " bytes
of logs";
+      low_io_most_logs_retained_bytes = logs_retained_bytes;
+      VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2)
+          << Substitute("Op $0 can free $1 bytes of logs",
+                        op->name(), logs_retained_bytes);
     }
 
-    if (stats.ram_anchored() > most_mem_anchored) {
-      most_mem_anchored_op = op;
-      most_mem_anchored = stats.ram_anchored();
+    const auto ram_anchored = stats.ram_anchored();
+    if (ram_anchored > most_ram_anchored) {
+      most_ram_anchored_op = op;
+      most_ram_anchored = ram_anchored;
     }
-    // We prioritize ops that can free more logs, but when it's the same we pick the one
that
-    // also frees up the most memory.
-    if (stats.logs_retained_bytes() > 0 &&
-        (stats.logs_retained_bytes() > most_logs_retained_bytes ||
-            (stats.logs_retained_bytes() == most_logs_retained_bytes &&
-                stats.ram_anchored() > most_logs_retained_bytes_ram_anchored))) {
+
+    // We prioritize ops that can free more logs, but when it's the same we pick
+    // the one that also frees up the most memory.
+    if (std::make_pair(logs_retained_bytes, ram_anchored) >
+        std::make_pair(most_logs_retained_bytes,
+                       most_logs_retained_bytes_ram_anchored)) {
       most_logs_retained_bytes_op = op;
-      most_logs_retained_bytes = stats.logs_retained_bytes();
-      most_logs_retained_bytes_ram_anchored = stats.ram_anchored();
+      most_logs_retained_bytes = logs_retained_bytes;
+      most_logs_retained_bytes_ram_anchored = ram_anchored;
     }
 
-    if (stats.data_retained_bytes() > most_data_retained_bytes) {
+    const auto data_retained_bytes = stats.data_retained_bytes();
+    if (data_retained_bytes > most_data_retained_bytes) {
       most_data_retained_bytes_op = op;
-      most_data_retained_bytes = stats.data_retained_bytes();
-      VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name()
<< " can free "
-                                       << stats.data_retained_bytes() << " bytes
of data";
+      most_data_retained_bytes = data_retained_bytes;
+      VLOG_AND_TRACE_WITH_PREFIX("maintenance", 2)
+          << Substitute("Op $0 can free $1 bytes of data",
+                        op->name(), data_retained_bytes);
     }
 
+    const auto perf_improvement = stats.perf_improvement();
     if ((!best_perf_improvement_op) ||
-        (stats.perf_improvement() > best_perf_improvement)) {
+        (perf_improvement > best_perf_improvement)) {
       best_perf_improvement_op = op;
-      best_perf_improvement = stats.perf_improvement();
+      best_perf_improvement = perf_improvement;
     }
   }
 
   // Look at ops that we can run quickly that free up log retention.
-  if (low_io_most_logs_retained_bytes_op) {
-    if (low_io_most_logs_retained_bytes > 0) {
-      string notes = Substitute("free $0 bytes of WAL", low_io_most_logs_retained_bytes);
-      return {low_io_most_logs_retained_bytes_op, std::move(notes)};
-    }
+  if (low_io_most_logs_retained_bytes_op && low_io_most_logs_retained_bytes >
0) {
+    string notes = Substitute("free $0 bytes of WAL", low_io_most_logs_retained_bytes);
+    return {low_io_most_logs_retained_bytes_op, std::move(notes)};
   }
 
   // Look at free memory. If it is dangerously low, we must select something
   // that frees memory-- the op with the most anchored memory.
   double capacity_pct;
   if (memory_pressure_func_(&capacity_pct)) {
-    if (!most_mem_anchored_op) {
+    if (!most_ram_anchored_op) {
       std::string msg = StringPrintf("System under memory pressure "
           "(%.2f%% of limit used). However, there are no ops currently "
           "runnable which would free memory.", capacity_pct);
-      LOG_WITH_PREFIX(INFO) << msg;
+      KLOG_EVERY_N_SECS(WARNING, 5) << msg;
       return {nullptr, msg};
     }
     string note = StringPrintf("under memory pressure (%.2f%% used, "
                                "can flush %" PRIu64 " bytes)",
-                               capacity_pct, most_mem_anchored);
-    return {most_mem_anchored_op, std::move(note)};
+                               capacity_pct, most_ram_anchored);
+    return {most_ram_anchored_op, std::move(note)};
   }
 
   if (most_logs_retained_bytes_op &&
@@ -478,7 +485,7 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
     running_ops_--;
     op->running_--;
     op->cond_->Signal();
-    cond_.Signal(); // wake up scheduler
+    cond_.Signal(); // Wake up scheduler.
   });
 
   scoped_refptr<Trace> trace(new Trace);
@@ -500,20 +507,20 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
 void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb)
{
   DCHECK(out_pb != nullptr);
   std::lock_guard<Mutex> guard(lock_);
-  pair<MaintenanceOp*, string> best_op_and_why = FindBestOp();
+  auto best_op_and_why = FindBestOp();
   auto* best_op = best_op_and_why.first;
 
-  for (MaintenanceManager::OpMapTy::value_type& val : ops_) {
+  for (const auto& val : ops_) {
     MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations();
     MaintenanceOp* op(val.first);
-    MaintenanceOpStats& stat(val.second);
+    const MaintenanceOpStats& stats(val.second);
     op_pb->set_name(op->name());
     op_pb->set_running(op->running());
-    if (stat.valid()) {
-      op_pb->set_runnable(stat.runnable());
-      op_pb->set_ram_anchored_bytes(stat.ram_anchored());
-      op_pb->set_logs_retained_bytes(stat.logs_retained_bytes());
-      op_pb->set_perf_improvement(stat.perf_improvement());
+    if (stats.valid()) {
+      op_pb->set_runnable(stats.runnable());
+      op_pb->set_ram_anchored_bytes(stats.ram_anchored());
+      op_pb->set_logs_retained_bytes(stats.logs_retained_bytes());
+      op_pb->set_perf_improvement(stats.perf_improvement());
     } else {
       op_pb->set_runnable(false);
       op_pb->set_ram_anchored_bytes(0);
diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h
index 7d20c8a..c5dd8ac 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -68,12 +68,12 @@ class MaintenanceOpStats {
     runnable_ = runnable;
   }
 
-  uint64_t ram_anchored() const {
+  int64_t ram_anchored() const {
     DCHECK(valid_);
     return ram_anchored_;
   }
 
-  void set_ram_anchored(uint64_t ram_anchored) {
+  void set_ram_anchored(int64_t ram_anchored) {
     UpdateLastModified();
     ram_anchored_ = ram_anchored;
   }
@@ -132,9 +132,9 @@ class MaintenanceOpStats {
   bool runnable_;
 
   // The approximate amount of memory that not doing this operation keeps
-  // around.  This number is used to decide when to start freeing memory, so it
-  // should be fairly accurate.  May be 0.
-  uint64_t ram_anchored_;
+  // around. This number is used to decide when to start freeing memory, so it
+  // should be fairly accurate. May be 0.
+  int64_t ram_anchored_;
 
   // Approximate amount of disk space in WAL files that would be freed if this
   // operation ran. May be 0.
@@ -160,14 +160,15 @@ struct OpInstance {
   std::string name;
   // Time the operation took to run. Value is unitialized if instance is still running.
   MonoDelta duration;
+  // The time at which the operation was launched.
   MonoTime start_mono_time;
 
   MaintenanceManagerStatusPB_OpInstancePB DumpToPB() const;
 };
 
 // MaintenanceOp objects represent background operations that the
-// MaintenanceManager can schedule.  Once a MaintenanceOp is registered, the
-// manager will periodically poll it for statistics.  The registrant is
+// MaintenanceManager can schedule. Once a MaintenanceOp is registered, the
+// manager will periodically poll it for statistics. The registrant is
 // responsible for managing the memory associated with the MaintenanceOp object.
 // Op objects should be unregistered before being de-allocated.
 class MaintenanceOp {
@@ -186,20 +187,20 @@ class MaintenanceOp {
   // Unregister this op, if it is currently registered.
   void Unregister();
 
-  // Update the op statistics.  This will be called every scheduling period
+  // Update the op statistics. This will be called every scheduling period
   // (about a few times a second), so it should not be too expensive.  It's
   // possible for the returned statistics to be invalid; the caller should
-  // call MaintenanceOpStats::valid() before using them.  This will be run
+  // call MaintenanceOpStats::valid() before using them. This will be run
   // under the MaintenanceManager lock.
   virtual void UpdateStats(MaintenanceOpStats* stats) = 0;
 
-  // Prepare to perform the operation.  This will be run without holding the
-  // maintenance manager lock.  It should be short, since it is run from the
+  // Prepare to perform the operation. This will be run without holding the
+  // maintenance manager lock. It should be short, since it is run from the
   // context of the maintenance op scheduler thread rather than a worker thread.
   // If this returns false, we will abort the operation.
   virtual bool Prepare() = 0;
 
-  // Perform the operation.  This will be run without holding the maintenance
+  // Perform the operation. This will be run without holding the maintenance
   // manager lock, and may take a long time.
   virtual void Perform() = 0;
 
@@ -207,15 +208,15 @@ class MaintenanceOp {
   virtual scoped_refptr<Histogram> DurationHistogram() const = 0;
 
   // Returns the gauge for this op that tracks when this op is running. Cannot be NULL.
-  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const = 0;
+  virtual scoped_refptr<AtomicGauge<uint32_t>> RunningGauge() const = 0;
 
   uint32_t running() { return running_; }
 
-  std::string name() const { return name_; }
+  const std::string& name() const { return name_; }
 
   IOUsage io_usage() const { return io_usage_; }
 
-  // Return true if the operation has been cancelled due to Unregister() pending.
+  // Return true if the operation has been cancelled due to a pending Unregister().
   bool cancelled() const {
     return cancel_.Load();
   }
@@ -234,7 +235,7 @@ class MaintenanceOp {
   // The name of the operation.  Op names must be unique.
   const std::string name_;
 
-  // The number of times that this op is currently running.
+  // The number of instances of this op that are currently running.
   uint32_t running_;
 
   // Set when we are trying to unregister the maintenance operation.
@@ -246,7 +247,7 @@ class MaintenanceOp {
   //
   // Note: 'cond_' is used with the MaintenanceManager's mutex. As such,
   // it only exists when the op is registered.
-  gscoped_ptr<ConditionVariable> cond_;
+  std::unique_ptr<ConditionVariable> cond_;
 
   // The MaintenanceManager with which this op is registered, or null
   // if it is not registered.
@@ -263,8 +264,8 @@ struct MaintenanceOpComparator {
 };
 
 // The MaintenanceManager manages the scheduling of background operations such
-// as flushes or compactions.  It runs these operations in the background, in a
-// thread pool.  It uses information provided in MaintenanceOpStats objects to
+// as flushes or compactions. It runs these operations in the background on a
+// thread pool. It uses information provided in MaintenanceOpStats objects to
 // decide which operations, if any, to run.
 class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManager>
{
  public:
@@ -322,7 +323,7 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
 
   const std::string server_uuid_;
   const int32_t num_threads_;
-  OpMapTy ops_; // registered operations
+  OpMapTy ops_; // Registered operations.
   Mutex lock_;
   scoped_refptr<kudu::Thread> monitor_thread_;
   gscoped_ptr<ThreadPool> thread_pool_;
@@ -342,7 +343,7 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
 
   // Running instances lock.
   //
-  // This is separate of lock_ so that worker threads don't need to take the
+  // This is separate from lock_ so that worker threads don't need to take the
   // global MM lock when beginning operations. When taking both
   // running_instances_lock_ and lock_, lock_ must be acquired first.
   Mutex running_instances_lock_;


Mime
View raw message