kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] 01/08: Reduce delta compaction logging
Date Wed, 23 Jan 2019 17:54:15 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f336020da4d12d135813c4ab68954ef104885d77
Author: Will Berkeley <wdberkeley@gmail.org>
AuthorDate: Thu Jan 17 15:54:35 2019 -0800

    Reduce delta compaction logging
    
    Delta compaction produces verbose and repetitive logs. This patch
    reduces the logging from something like this:
    
    I0117 16:15:46.504104 153677824 maintenance_manager.cc:304] P 91d1117cc2aa45debfd9e7eb83bdad56:
Scheduling MajorDeltaCompactionOp(bd70598ebd5d4d7d95b78471f09b2bc2): perf score=1.000000
    I0117 16:15:46.504354 123629568 diskrowset.cc:578] T bd70598ebd5d4d7d95b78471f09b2bc2
P 91d1117cc2aa45debfd9e7eb83bdad56: RowSet(0): Major compacting REDO delta stores (cols: 11
12)
    I0117 16:15:46.504496 123629568 delta_compaction.cc:318] Starting major delta compaction
for columns string STRING NOT NULL, int64 INT64 NOT NULL
    I0117 16:15:46.505794 123629568 multi_column_writer.cc:98] Opened CFile writers for 2
column(s)
    I0117 16:15:46.674464 123629568 delta_compaction.cc:331] Finished major delta compaction
of columns string STRING NOT NULL, int64 INT64 NOT NULL. Compacted 2 delta files. Overall
stats: delete_count=0, reinsert_count=0, update_count=49948
    I0117 16:15:46.739264 123629568 maintenance_manager.cc:494] P 91d1117cc2aa45debfd9e7eb83bdad56:
MajorDeltaCompactionOp(bd70598ebd5d4d7d95b78471f09b2bc2) complete. Timing: real 0.235s	user
0.164s	sys 0.008s Metrics: {"cfile_cache_hit":31,"cfile_cache_hit_bytes":159531,"cfile_cache_miss":11,"cfile_cache_miss_bytes":67765,"cfile_init":3,"delta_iterators_relevant":2}
    
    to something like this:
    
    I0117 16:20:11.734989 165629952 maintenance_manager.cc:304] P 9340476e2ce842c3b9fbf0fc22b3603a:
Scheduling MajorDeltaCompactionOp(84947903910c449c86722ae727b1c9ff): perf score=1.000000
    I0117 16:20:11.735800 135581696 multi_column_writer.cc:98] Opened CFile writers for 2
column(s)
    I0117 16:20:11.985893 135581696 maintenance_manager.cc:494] P 9340476e2ce842c3b9fbf0fc22b3603a:
MajorDeltaCompactionOp(84947903910c449c86722ae727b1c9ff) complete. Timing: real 0.251s	user
0.166s	sys 0.009s Metrics: {"cfile_cache_hit":31,"cfile_cache_hit_bytes":165814,"cfile_cache_miss":10,"cfile_cache_miss_bytes":60929,"cfile_init":3,"delete_count":0,"delta_blocks_compacted":2,"delta_iterators_relevant":2,"reinsert_count":0,"update_count":49864}
    
    The intermediate messages from the delta compaction code itself have been
    moved to VLOG(1). Four additional trace metrics are added to provide the
    most useful information from the old INFO messages:
    
    - The number of delta blocks compacted
    - The number of deletes, reinserts, and updates in the delta blocks.
      Note that, as is standard for delta blocks, updates are counted by
      column, so, e.g. updating '(key=0, foo=1, bar=2)' to
      '(key=0, foo=3, bar=55)' counts as 2 updates, one for 'foo' and one
      for 'bar'.
    
    The above comparison was like for like, but I also want to point out
    that the list of columns and per-column counts in some messages can get
    very long:
    
    I0113 21:55:33.509917 63805 delta_compaction.cc:294] Starting major delta compaction for
columns giclnt[string NOT NULL] gicfac[string NOT NULL] gicdiv[string NOT NULL] gifref[string
NOT NULL] giatdc[string NOT NULL] gifcls[string NOT NULL] giatyp[string NOT NULL] gisrvd[string
NOT NULL] giadmd[string NOT NULL] gibild[string NOT NULL] gidscd[string NOT NULL] gidrgc[string
NOT NULL] gichga[decimal(11, 2) NOT NULL] gibaln[decimal(11, 2) NOT NULL] gibddt[string NOT
NULL] gibmld[string NO [...]
    
    and
    
    I0113 21:55:33.510085 63805 delta_compaction.cc:298] Preparing to major compact delta
file: 0000000294776126 (ts range=[6334544750136942592, 6337682405990748160], delete_count=[0],
reinsert_count=[0], update_counts_by_col_id=[46:24,45:24,44:24,43:24,42:24,41:24,40:24,39:24,38:24,37:24,36:24,35:24,34:24,33:24,32:24,31:24,30:24,29:24,28:24,27:24,26:24,25:24,24:24,23:24,10:24,9:24,8:24,7:24,6:24,5:24,4:24,3:24,2:24,1:24,11:24,60:24,13:24,61:24,14:24,62:24,15:24,63:24,16:24,64:24,17:24,65
[...]
    
    Therefore, the reduction in amount of stuff logged can be very
    significant.
    
    Change-Id: Id6fb108e8c09c70de4da099ed7b15b0060f2b28b
    Reviewed-on: http://gerrit.cloudera.org:8080/12233
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <awong@cloudera.com>
---
 src/kudu/tablet/delta_compaction.cc  | 41 +++++++++++++++++++++++++-----------
 src/kudu/tablet/delta_store.h        |  2 +-
 src/kudu/tablet/delta_tracker.cc     | 37 +++++++++++++++++++++-----------
 src/kudu/tablet/diskrowset.cc        |  2 +-
 src/kudu/util/logging.h              |  2 +-
 src/kudu/util/maintenance_manager.cc |  7 +++---
 6 files changed, 61 insertions(+), 30 deletions(-)

diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index 475306a..f597152 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -50,6 +50,7 @@
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/rowset_metadata.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/trace.h"
 
 using std::shared_ptr;
 
@@ -294,10 +295,22 @@ Status MajorDeltaCompaction::OpenUndoDeltaFileWriter() {
 }
 
 namespace {
-string DeltaStoreStatsToString(const vector<shared_ptr<DeltaStore>>& stores)
{
-  uint64_t delete_count = 0;
-  uint64_t reinsert_count = 0;
-  uint64_t update_count = 0;
+struct DeltaStoreStats {
+  int64_t delete_count;
+  int64_t reinsert_count;
+  int64_t update_count;
+
+ public:
+  string ToString() const {
+    return Substitute("delete_count=$0, reinsert_count=$1, update_count=$2",
+                      delete_count, reinsert_count, update_count);
+  }
+};
+
+DeltaStoreStats ComputeDeltaStoreStats(const SharedDeltaStoreVector& stores) {
+  int64_t delete_count = 0;
+  int64_t reinsert_count = 0;
+  int64_t update_count = 0;
   for (const auto& store : stores) {
     if (!store->Initted()) {
       continue;
@@ -307,15 +320,14 @@ string DeltaStoreStatsToString(const vector<shared_ptr<DeltaStore>>&
stores) {
     reinsert_count += stats.reinsert_count();
     update_count += stats.UpdateCount();
   }
-  return Substitute("delete_count=$0, reinsert_count=$1, update_count=$2",
-                    delete_count, reinsert_count, update_count);
+  return DeltaStoreStats{delete_count, reinsert_count, update_count};
 }
 } // anonymous namespace
 
 Status MajorDeltaCompaction::Compact(const IOContext* io_context) {
   CHECK_EQ(state_, kInitialized);
 
-  LOG(INFO) << "Starting major delta compaction for columns " << ColumnNamesToString();
+  VLOG(1) << "Starting major delta compaction for columns " << ColumnNamesToString();
   RETURN_NOT_OK(base_schema_.CreateProjectionByIdsIgnoreMissing(column_ids_, &partial_schema_));
 
   if (VLOG_IS_ON(1)) {
@@ -328,11 +340,16 @@ Status MajorDeltaCompaction::Compact(const IOContext* io_context) {
   RETURN_NOT_OK(OpenBaseDataWriter());
   RETURN_NOT_OK(FlushRowSetAndDeltas(io_context));
 
-  LOG(INFO) << Substitute("Finished major delta compaction of columns $0. "
-                          "Compacted $1 delta files. Overall stats: $2",
-                          ColumnNamesToString(),
-                          included_stores_.size(),
-                          DeltaStoreStatsToString(included_stores_));
+  TRACE_COUNTER_INCREMENT("delta_blocks_compacted", included_stores_.size());
+  const auto stats = ComputeDeltaStoreStats(included_stores_);
+  TRACE_COUNTER_INCREMENT("delete_count", stats.delete_count);
+  TRACE_COUNTER_INCREMENT("reinsert_count", stats.reinsert_count);
+  TRACE_COUNTER_INCREMENT("update_count", stats.update_count);
+  VLOG(1) << Substitute("Finished major delta compaction of columns $0. "
+                        "Compacted $1 delta files. Overall stats: $2",
+                        ColumnNamesToString(),
+                        included_stores_.size(),
+                        stats.ToString());
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 1f5f82b..269f353 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -96,7 +96,7 @@ class DeltaStore {
   virtual ~DeltaStore() {}
 };
 
-typedef std::vector<std::shared_ptr<DeltaStore> > SharedDeltaStoreVector;
+typedef std::vector<std::shared_ptr<DeltaStore>> SharedDeltaStoreVector;
 
 // Iterator over deltas.
 // For each rowset, this iterator is constructed alongside the base data iterator,
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 9b4fd2b..4b75769 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -52,6 +52,7 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
+#include "kudu/util/trace.h"
 
 namespace kudu {
 
@@ -164,13 +165,16 @@ Status DeltaTracker::DoOpen(const IOContext* io_context) {
 Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(const IOContext* io_context,
                                                      size_t start_idx, size_t end_idx,
                                                      const Schema* projection,
-                                                     vector<shared_ptr<DeltaStore>
> *target_stores,
+                                                     SharedDeltaStoreVector* target_stores,
                                                      vector<BlockId> *target_blocks,
-                                                     std::unique_ptr<DeltaIterator>
*out) {
+                                                     std::unique_ptr<DeltaIterator>*
out) {
   CHECK(open_);
   CHECK_LE(start_idx, end_idx);
   CHECK_LT(end_idx, redo_delta_stores_.size());
-  vector<shared_ptr<DeltaStore> > inputs;
+  SharedDeltaStoreVector inputs;
+  int64_t delete_count = 0;
+  int64_t reinsert_count = 0;
+  int64_t update_count = 0;
   for (size_t idx = start_idx; idx <= end_idx; ++idx) {
     shared_ptr<DeltaStore> &delta_store = redo_delta_stores_[idx];
 
@@ -179,12 +183,21 @@ Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(const IOContext*
io_context
     ignore_result(down_cast<DeltaFileReader*>(delta_store.get()));
     shared_ptr<DeltaFileReader> dfr = std::static_pointer_cast<DeltaFileReader>(delta_store);
 
-    LOG_WITH_PREFIX(INFO) << "Preparing to minor compact delta file: " << dfr->ToString();
+    if (dfr->Initted()) {
+      delete_count += dfr->delta_stats().delete_count();
+      reinsert_count += dfr->delta_stats().reinsert_count();
+      update_count += dfr->delta_stats().UpdateCount();
+    }
+    VLOG_WITH_PREFIX(1) << "Preparing to minor compact delta file: "
+                        << dfr->ToString();
 
     inputs.push_back(delta_store);
     target_stores->push_back(delta_store);
     target_blocks->push_back(dfr->block_id());
   }
+  TRACE_COUNTER_INCREMENT("delete_count", delete_count);
+  TRACE_COUNTER_INCREMENT("reinsert_count", reinsert_count);
+  TRACE_COUNTER_INCREMENT("update_count", update_count);
   RowIteratorOptions opts;
   opts.projection = projection;
   opts.io_context = io_context;
@@ -411,11 +424,13 @@ Status DeltaTracker::CompactStores(const IOContext* io_context, int
start_idx, i
   RowSetMetadataUpdate update;
   update.ReplaceRedoDeltaBlocks(compacted_blocks, new_blocks);
 
-  LOG_WITH_PREFIX(INFO) << Substitute("Flushing compaction of $0 redo delta "
-                                      "blocks { $1 } into block $2",
-                                      compacted_blocks.size(),
-                                      BlockId::JoinStrings(compacted_blocks),
-                                      new_block_id.ToString());
+  const auto num_blocks_compacted = compacted_blocks.size();
+  TRACE_COUNTER_INCREMENT("delta_blocks_compacted", num_blocks_compacted);
+  VLOG_WITH_PREFIX(1) << Substitute("Flushing compaction of $0 redo delta "
+                                    "blocks { $1 } into block $2",
+                                    num_blocks_compacted,
+                                    BlockId::JoinStrings(compacted_blocks),
+                                    new_block_id.ToString());
   RETURN_NOT_OK_PREPEND(CommitDeltaStoreMetadataUpdate(update, compacted_stores, new_blocks,
                                                        io_context, REDO, FLUSH_METADATA),
                         "DeltaTracker: CompactStores: Unable to commit delta update");
@@ -536,7 +551,7 @@ Status DeltaTracker::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
 Status DeltaTracker::DoCompactStores(const IOContext* io_context,
                                      size_t start_idx, size_t end_idx,
                                      unique_ptr<WritableBlock> block,
-                                     vector<shared_ptr<DeltaStore> > *compacted_stores,
+                                     SharedDeltaStoreVector* compacted_stores,
                                      vector<BlockId> *compacted_blocks) {
   unique_ptr<DeltaIterator> inputs_merge;
 
@@ -548,14 +563,12 @@ Status DeltaTracker::DoCompactStores(const IOContext* io_context,
   RETURN_NOT_OK(MakeDeltaIteratorMergerUnlocked(io_context, start_idx, end_idx,
                                                 &empty_schema, compacted_stores,
                                                 compacted_blocks, &inputs_merge));
-  LOG_WITH_PREFIX(INFO) << "Compacting " << (end_idx - start_idx + 1) <<
" delta files.";
   DeltaFileWriter dfw(std::move(block));
   RETURN_NOT_OK(dfw.Start());
   RETURN_NOT_OK(WriteDeltaIteratorToFile<REDO>(inputs_merge.get(),
                                                ITERATE_OVER_ALL_ROWS,
                                                &dfw));
   RETURN_NOT_OK(dfw.Finish());
-  LOG_WITH_PREFIX(INFO) << "Succesfully compacted the specified delta files.";
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 6640822..066385d 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -575,7 +575,7 @@ Status DiskRowSet::MajorCompactDeltaStores(const IOContext* io_context,
 Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>&
col_ids,
                                                         const IOContext* io_context,
                                                         HistoryGcOpts history_gc_opts) {
-  LOG_WITH_PREFIX(INFO) << "Major compacting REDO delta stores (cols: " << col_ids
<< ")";
+  VLOG_WITH_PREFIX(1) << "Major compacting REDO delta stores (cols: " << col_ids
<< ")";
   TRACE_EVENT0("tablet", "DiskRowSet::MajorCompactDeltaStoresWithColumnIds");
   std::lock_guard<Mutex> l(*delta_tracker()->compact_flush_lock());
   RETURN_NOT_OK(delta_tracker()->CheckWritableUnlocked());
diff --git a/src/kudu/util/logging.h b/src/kudu/util/logging.h
index e041831..87012b1 100644
--- a/src/kudu/util/logging.h
+++ b/src/kudu/util/logging.h
@@ -351,7 +351,7 @@ std::ostream& operator<<(std::ostream &os, const PRIVATE_ThrottleMsg&);
 
 // Convenience macros to prefix log messages with some prefix, these are the unlocked
 // versions and should not obtain a lock (if one is required to obtain the prefix).
-// There must be a LogPrefixUnlocked()/LogPrefixLocked() method available in the current
+// There must be a LogPrefixUnlocked()/LogPrefix() method available in the current
 // scope in order to use these macros.
 #define LOG_WITH_PREFIX_UNLOCKED(severity) LOG(severity) << LogPrefixUnlocked()
 #define VLOG_WITH_PREFIX_UNLOCKED(verboselevel) LOG_IF(INFO, VLOG_IS_ON(verboselevel)) \
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index 9a42464..a1909bb 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -491,9 +491,10 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
     op->Perform();
     sw.stop();
   }
-  LOG_WITH_PREFIX(INFO) << op->name() << " complete. "
-                        << "Timing: " << sw.elapsed().ToString()
-                        << " Metrics: " << trace->MetricsAsJSON();
+  LOG_WITH_PREFIX(INFO) << Substitute("$0 complete. Timing: $1 Metrics: $2",
+                                      op->name(),
+                                      sw.elapsed().ToString(),
+                                      trace->MetricsAsJSON());
 }
 
 void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb)
{


Mime
View raw message