kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [2/7] kudu git commit: KUDU-2279 (part 2): metrics: only emit changed metrics in metrics log
Date Tue, 06 Feb 2018 08:08:53 GMT
KUDU-2279 (part 2): metrics: only emit changed metrics in metrics log

This adds a global 'metrics epoch' which can be externally incremented.
When metrics are modified, they remember the epoch of their most recent
modification.

When we dump metrics, we can pass a lower bound in order to see only
metrics which have been modified in or after a given epoch.

This patch updates the metrics logging to only emit metrics that have
changed in each successive line. This should substantially reduce the
size and CPU cost of metric logging on servers with thousands of
tablets.

Change-Id: Ia26be99a1fa96d52e2ca0905844d56c096d3778e
Reviewed-on: http://gerrit.cloudera.org:8080/9176
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wdberkeley@gmail.com>
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ab538740
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ab538740
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ab538740

Branch: refs/heads/master
Commit: ab5387403e42df3161492d6f48b9b51bc85c7499
Parents: adb90a0
Author: Todd Lipcon <todd@apache.org>
Authored: Wed Jan 31 18:19:51 2018 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Tue Feb 6 07:58:18 2018 +0000

----------------------------------------------------------------------
 src/kudu/server/server_base.cc | 18 +++++++----
 src/kudu/util/metrics-test.cc  | 61 ++++++++++++++++++++++++++++---------
 src/kudu/util/metrics.cc       | 32 ++++++++++++++++---
 src/kudu/util/metrics.h        | 56 +++++++++++++++++++++++++++++++++-
 4 files changed, 141 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ab538740/src/kudu/server/server_base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index ebef92c..d094318 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -613,6 +613,8 @@ void ServerBase::MetricsLoggingThread() {
   // logging metrics.
   const MonoDelta kWaitBetweenFailures = MonoDelta::FromSeconds(60);
 
+  MetricJsonOptions opts;
+  opts.include_raw_histograms = true;
 
   MonoTime next_log = MonoTime::Now();
   while (!stop_background_threads_latch_.WaitUntil(next_log)) {
@@ -623,13 +625,10 @@ void ServerBase::MetricsLoggingThread() {
     buf << "metrics " << GetCurrentTimeMicros() << " ";
 
     // Collect the metrics JSON string.
-    vector<string> metrics;
-    metrics.emplace_back("*");
-    MetricJsonOptions opts;
-    opts.include_raw_histograms = true;
-
+    int64_t this_log_epoch = Metric::current_epoch();
+    Metric::IncrementEpoch();
     JsonWriter writer(&buf, JsonWriter::COMPACT);
-    Status s = metric_registry_->WriteAsJson(&writer, metrics, opts);
+    Status s = metric_registry_->WriteAsJson(&writer, {"*"}, opts);
     if (!s.ok()) {
       WARN_NOT_OK(s, "Unable to collect metrics to log");
       next_log += kWaitBetweenFailures;
@@ -644,6 +643,13 @@ void ServerBase::MetricsLoggingThread() {
       next_log += kWaitBetweenFailures;
       continue;
     }
+
+    // Next time we fetch, only show those that changed after the epoch
+    // we just logged.
+    //
+    // NOTE: we only bump this in the successful log case so that if we failed to
+    // write above, we wouldn't skip any changes.
+    opts.only_modified_in_or_after_epoch = this_log_epoch + 1;
   }
 
   WARN_NOT_OK(log.Close(), "Unable to close metric log");

http://git-wip-us.apache.org/repos/asf/kudu/blob/ab538740/src/kudu/util/metrics-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/metrics-test.cc b/src/kudu/util/metrics-test.cc
index 698fd7d..3d2d374 100644
--- a/src/kudu/util/metrics-test.cc
+++ b/src/kudu/util/metrics-test.cc
@@ -36,6 +36,7 @@
 #include "kudu/util/jsonwriter.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
@@ -62,13 +63,13 @@ class MetricsTest : public KuduTest {
   scoped_refptr<MetricEntity> entity_;
 };
 
-METRIC_DEFINE_counter(test_entity, reqs_pending, "Requests Pending", MetricUnit::kRequests,
-                      "Number of requests pending");
+METRIC_DEFINE_counter(test_entity, test_counter, "My Test Counter", MetricUnit::kRequests,
+                      "Description of test counter");
 
 TEST_F(MetricsTest, SimpleCounterTest) {
   scoped_refptr<Counter> requests =
-    new Counter(&METRIC_reqs_pending);
-  ASSERT_EQ("Number of requests pending", requests->prototype()->description());
+    new Counter(&METRIC_test_counter);
+  ASSERT_EQ("Description of test counter", requests->prototype()->description());
   ASSERT_EQ(0, requests->value());
   requests->Increment();
   ASSERT_EQ(1, requests->value());
@@ -76,13 +77,13 @@ TEST_F(MetricsTest, SimpleCounterTest) {
   ASSERT_EQ(3, requests->value());
 }
 
-METRIC_DEFINE_gauge_uint64(test_entity, fake_memory_usage, "Memory Usage",
-                           MetricUnit::kBytes, "Test Gauge 1");
+METRIC_DEFINE_gauge_uint64(test_entity, test_gauge, "Test uint64 Gauge",
+                           MetricUnit::kBytes, "Description of Test Gauge");
 
 TEST_F(MetricsTest, SimpleAtomicGaugeTest) {
   scoped_refptr<AtomicGauge<uint64_t> > mem_usage =
-    METRIC_fake_memory_usage.Instantiate(entity_, 0);
-  ASSERT_EQ(METRIC_fake_memory_usage.description(), mem_usage->prototype()->description());
+    METRIC_test_gauge.Instantiate(entity_, 0);
+  ASSERT_EQ(METRIC_test_gauge.description(), mem_usage->prototype()->description());
   ASSERT_EQ(0, mem_usage->value());
   mem_usage->IncrementBy(7);
   ASSERT_EQ(7, mem_usage->value());
@@ -90,8 +91,8 @@ TEST_F(MetricsTest, SimpleAtomicGaugeTest) {
   ASSERT_EQ(5, mem_usage->value());
 }
 
-METRIC_DEFINE_gauge_int64(test_entity, test_func_gauge, "Test Gauge", MetricUnit::kBytes,
-                          "Test Gauge 2");
+METRIC_DEFINE_gauge_int64(test_entity, test_func_gauge, "Test Function Gauge",
+                          MetricUnit::kBytes, "Test Gauge 2");
 
 static int64_t MyFunction(int* metric_val) {
   return (*metric_val)++;
@@ -176,8 +177,8 @@ TEST_F(MetricsTest, SimpleHistogramTest) {
 }
 
 TEST_F(MetricsTest, JsonPrintTest) {
-  scoped_refptr<Counter> bytes_seen = METRIC_reqs_pending.Instantiate(entity_);
-  bytes_seen->Increment();
+  scoped_refptr<Counter> test_counter = METRIC_test_counter.Instantiate(entity_);
+  test_counter->Increment();
   entity_->SetAttribute("test_attr", "attr_val");
 
   // Generate the JSON.
@@ -194,7 +195,7 @@ TEST_F(MetricsTest, JsonPrintTest) {
   ASSERT_EQ(1, metrics.size());
   string metric_name;
   ASSERT_OK(reader.ExtractString(metrics[0], "name", &metric_name));
-  ASSERT_EQ("reqs_pending", metric_name);
+  ASSERT_EQ("test_counter", metric_name);
   int64_t metric_value;
   ASSERT_OK(reader.ExtractInt64(metrics[0], "value", &metric_value));
   ASSERT_EQ(1L, metric_value);
@@ -216,7 +217,7 @@ TEST_F(MetricsTest, RetirementTest) {
   FLAGS_metrics_retirement_age_ms = 100;
 
   const string kMetricName = "foo";
-  scoped_refptr<Counter> counter = METRIC_reqs_pending.Instantiate(entity_);
+  scoped_refptr<Counter> counter = METRIC_test_counter.Instantiate(entity_);
   ASSERT_EQ(1, entity_->UnsafeMetricsMapForTests().size());
 
   // Since we hold a reference to the counter, it should not get retired.
@@ -287,7 +288,7 @@ TEST_F(MetricsTest, TestDumpJsonPrototypes) {
   const char* expected =
     "        {\n"
     "            \"name\": \"test_func_gauge\",\n"
-    "            \"label\": \"Test Gauge\",\n"
+    "            \"label\": \"Test Function Gauge\",\n"
     "            \"type\": \"gauge\",\n"
     "            \"unit\": \"bytes\",\n"
     "            \"description\": \"Test Gauge 2\",\n"
@@ -316,4 +317,34 @@ TEST_F(MetricsTest, TestDumpJsonPrototypes) {
   ASSERT_TRUE(ContainsKey(seen_metrics, "test_hist"));
 }
 
+TEST_F(MetricsTest, TestDumpOnlyChanged) {
+  auto GetJson = [&](int64_t since_epoch) {
+    MetricJsonOptions opts;
+    opts.only_modified_in_or_after_epoch = since_epoch;
+    std::ostringstream out;
+    JsonWriter writer(&out, JsonWriter::COMPACT);
+    CHECK_OK(entity_->WriteAsJson(&writer, { "*" }, opts));
+    return out.str();
+  };
+
+  scoped_refptr<Counter> test_counter = METRIC_test_counter.Instantiate(entity_);
+
+  int64_t epoch_when_modified = Metric::current_epoch();
+  test_counter->Increment();
+
+  // If we pass a "since dirty" epoch from before we incremented it, we should
+  // see the metric.
+  for (int i = 0; i < 2; i++) {
+    ASSERT_STR_CONTAINS(GetJson(epoch_when_modified), "{\"name\":\"test_counter\",\"value\":1}");
+    Metric::IncrementEpoch();
+  }
+
+  // If we pass a current epoch, we should see that the metric was not modified.
+  int64_t new_epoch = Metric::current_epoch();
+  ASSERT_STR_NOT_CONTAINS(GetJson(new_epoch), "test_counter");
+  // ... until we modify it again.
+  test_counter->Increment();
+  ASSERT_STR_CONTAINS(GetJson(new_epoch), "{\"name\":\"test_counter\",\"value\":2}");
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/ab538740/src/kudu/util/metrics.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/metrics.cc b/src/kudu/util/metrics.cc
index 6565d97..db5b8f5 100644
--- a/src/kudu/util/metrics.cc
+++ b/src/kudu/util/metrics.cc
@@ -249,9 +249,10 @@ Status MetricEntity::WriteAsJson(JsonWriter* writer,
   writer->String("metrics");
   writer->StartArray();
   for (OrderedMetricMap::value_type& val : metrics) {
-    WARN_NOT_OK(val.second->WriteAsJson(writer, opts),
-                strings::Substitute("Failed to write $0 as JSON", val.first));
-
+    if (val.second->ModifiedInOrAfterEpoch(opts.only_modified_in_or_after_epoch)) {
+      WARN_NOT_OK(val.second->WriteAsJson(writer, opts),
+                  strings::Substitute("Failed to write $0 as JSON", val.first));
+    }
   }
   writer->EndArray();
 
@@ -493,13 +494,32 @@ scoped_refptr<MetricEntity> MetricRegistry::FindOrCreateEntity(
 //
 // Metric
 //
+
+std::atomic<int64_t> Metric::g_epoch_;
+
 Metric::Metric(const MetricPrototype* prototype)
-  : prototype_(prototype) {
+    : prototype_(prototype),
+      m_epoch_(current_epoch()) {
 }
 
 Metric::~Metric() {
 }
 
+void Metric::IncrementEpoch() {
+  g_epoch_++;
+}
+
+void Metric::UpdateModificationEpochSlowPath() {
+  int64_t new_epoch, old_epoch;
+  // CAS loop to ensure that we never transition a metric's epoch backwards
+  // even if multiple threads race to update it.
+  do {
+    old_epoch = m_epoch_;
+    new_epoch = g_epoch_;
+  } while (old_epoch < new_epoch &&
+           !m_epoch_.compare_exchange_weak(old_epoch, new_epoch));
+}
+
 //
 // Gauge
 //
@@ -531,6 +551,7 @@ std::string StringGauge::value() const {
 }
 
 void StringGauge::set_value(const std::string& value) {
+  UpdateModificationEpoch();
   std::lock_guard<simple_spinlock> l(lock_);
   value_ = value;
 }
@@ -560,6 +581,7 @@ void Counter::Increment() {
 }
 
 void Counter::IncrementBy(int64_t amount) {
+  UpdateModificationEpoch();
   value_.IncrementBy(amount);
 }
 
@@ -609,10 +631,12 @@ Histogram::Histogram(const HistogramPrototype* proto)
 }
 
 void Histogram::Increment(int64_t value) {
+  UpdateModificationEpoch();
   histogram_->Increment(value);
 }
 
 void Histogram::IncrementBy(int64_t value, int64_t amount) {
+  UpdateModificationEpoch();
   histogram_->IncrementBy(value, amount);
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ab538740/src/kudu/util/metrics.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/metrics.h b/src/kudu/util/metrics.h
index aa553c9..9e1cac4 100644
--- a/src/kudu/util/metrics.h
+++ b/src/kudu/util/metrics.h
@@ -223,8 +223,10 @@
 //
 /////////////////////////////////////////////////////
 
+#include <atomic>
 #include <cstddef>
 #include <cstdint>
+#include <limits>
 #include <mutex>
 #include <string>
 #include <unordered_map>
@@ -420,6 +422,13 @@ struct MetricJsonOptions {
   // unit, etc).
   // Default: false
   bool include_schema_info;
+
+  // Try to skip any metrics which have not been modified since before
+  // the given epoch. The current epoch can be fetched using
+  // Metric::current_epoch() and incremented using Metric::IncrementEpoch().
+  //
+  // Note that this is an inclusive bound.
+  int64_t only_modified_in_or_after_epoch = 0;
 };
 
 class MetricEntityPrototype {
@@ -555,13 +564,49 @@ class Metric : public RefCountedThreadSafe<Metric> {
 
   const MetricPrototype* prototype() const { return prototype_; }
 
+  // Return true if this metric has changed in or after the given metrics epoch.
+  bool ModifiedInOrAfterEpoch(int64_t epoch) {
+    return m_epoch_ >= epoch;
+  }
+
+  // Return the current epoch for tracking modification of metrics.
+  // This can be passed as 'MetricJsonOptions::only_modified_since_epoch' to
+  // get a diff of metrics between two points in time.
+  static int64_t current_epoch() {
+    return g_epoch_;
+  }
+
+  // Advance to the next epoch for metrics.
+  // This is cheap for the calling thread but causes some extra work on the paths
+  // of hot metric updaters, so should only be done rarely (eg before dumping
+  // metrics).
+  static void IncrementEpoch();
+
  protected:
   explicit Metric(const MetricPrototype* prototype);
   virtual ~Metric();
 
   const MetricPrototype* const prototype_;
 
+  void UpdateModificationEpoch() {
+    // If we have some upper bound, we need to invalidate it. We use a 'test-and-set'
+    // here to avoid contending on writes to this cacheline.
+    if (m_epoch_ < current_epoch()) {
+      // Out-of-line the uncommon case which requires a bit more code.
+      UpdateModificationEpochSlowPath();
+    }
+  }
+
+  // The last metrics epoch in which this metric was modified.
+  // We use epochs instead of timestamps since we can ensure that epochs
+  // only change rarely. Thus this member is read-mostly and doesn't cause
+  // cacheline bouncing between metrics writers. We also don't need to read
+  // the system clock, which is more expensive compared to reading 'g_epoch_'.
+  std::atomic<int64_t> m_epoch_;
+
  private:
+  void UpdateModificationEpochSlowPath();
+
   friend class MetricEntity;
   friend class RefCountedThreadSafe<Metric>;
 
@@ -570,6 +615,9 @@ class Metric : public RefCountedThreadSafe<Metric> {
   // uninitialized.
   MonoTime retire_time_;
 
+  // See 'current_epoch()'.
+  static std::atomic<int64_t> g_epoch_;
+
   DISALLOW_COPY_AND_ASSIGN(Metric);
 };
 
@@ -794,9 +842,11 @@ class AtomicGauge : public Gauge {
     value_.Store(static_cast<int64_t>(value), kMemOrderNoBarrier);
   }
   void Increment() {
+    UpdateModificationEpoch();
     value_.IncrementBy(1, kMemOrderNoBarrier);
   }
   virtual void IncrementBy(int64_t amount) {
+    UpdateModificationEpoch();
     value_.IncrementBy(amount, kMemOrderNoBarrier);
   }
   void Decrement() {
@@ -925,7 +975,11 @@ class FunctionGauge : public Gauge {
   friend class MetricEntity;
 
   FunctionGauge(const GaugePrototype<T>* proto, Callback<T()> function)
-      : Gauge(proto), function_(std::move(function)) {}
+      : Gauge(proto), function_(std::move(function)) {
+    // Override the modification epoch to the maximum, since we don't have any idea
+    // when the bound function changes value.
+    m_epoch_ = std::numeric_limits<decltype(m_epoch_.load())>::max();
+  }
 
   static T Return(T v) {
     return v;


Mime
View raw message